From bbd65fd6123f7d1a93d1f6a68f2b933d53ec3c23 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 8 Feb 2026 02:11:51 +0000 Subject: feat: Add separate queue for import link crawling (#2452) * feat: add separate queue for import link crawling --------- Co-authored-by: Claude --- apps/workers/index.ts | 7 +++- apps/workers/workers/crawlerWorker.ts | 75 ++++++++++++++++++++--------------- apps/workers/workers/importWorker.ts | 4 +- 3 files changed, 51 insertions(+), 35 deletions(-) (limited to 'apps') diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 931a505f..c7b9533d 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -10,6 +10,7 @@ import { initTracing, LinkCrawlerQueue, loadAllPlugins, + LowPriorityCrawlerQueue, OpenAIQueue, prepareQueue, RuleEngineQueue, @@ -38,7 +39,11 @@ import { WebhookWorker } from "./workers/webhookWorker"; const workerBuilders = { crawler: async () => { await LinkCrawlerQueue.ensureInit(); - return CrawlerWorker.build(); + return CrawlerWorker.build(LinkCrawlerQueue); + }, + lowPriorityCrawler: async () => { + await LowPriorityCrawlerQueue.ensureInit(); + return CrawlerWorker.build(LowPriorityCrawlerQueue); }, inference: async () => { await OpenAIQueue.ensureInit(); diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 48ea5352..5869354f 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -59,7 +59,6 @@ import { import { AssetPreprocessingQueue, getTracer, - LinkCrawlerQueue, OpenAIQueue, QuotaService, setSpanAttributes, @@ -84,8 +83,10 @@ import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { DequeuedJob, + DequeuedJobError, EnqueueOptions, getQueueClient, + Queue, QueueRetryAfterError, } from "@karakeep/shared/queueing"; import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; @@ -302,42 +303,54 @@ async function launchBrowser() { } export class CrawlerWorker { - static async build() { - chromium.use(StealthPlugin()); - if (serverConfig.crawler.enableAdblocker) { - logger.info("[crawler] Loading adblocker ..."); - const globalBlockerResult = await tryCatch( - PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { - path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), - read: fs.readFile, - write: fs.writeFile, - }), - ); - if (globalBlockerResult.error) { - logger.error( - `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, - ); - } else { - globalBlocker = globalBlockerResult.data; - } - } - if (!serverConfig.crawler.browserConnectOnDemand) { - await launchBrowser(); - } else { - logger.info( - "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", - ); + private static initPromise: Promise | null = null; + + private static ensureInitialized() { + if (!CrawlerWorker.initPromise) { + CrawlerWorker.initPromise = (async () => { + chromium.use(StealthPlugin()); + if (serverConfig.crawler.enableAdblocker) { + logger.info("[crawler] Loading adblocker ..."); + const globalBlockerResult = await tryCatch( + PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { + path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), + read: fs.readFile, + write: fs.writeFile, + }), + ); + if (globalBlockerResult.error) { + logger.error( + `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, + ); + } else { + globalBlocker = globalBlockerResult.data; + } + } + if (!serverConfig.crawler.browserConnectOnDemand) { + await launchBrowser(); + } else { + logger.info( + "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", + ); + } + await loadCookiesFromFile(); + })(); } + return CrawlerWorker.initPromise; + } + + static async build(queue: Queue) { + await CrawlerWorker.ensureInitialized(); logger.info("Starting crawler worker ..."); - const worker = (await getQueueClient())!.createRunner< + const worker = (await getQueueClient()).createRunner< ZCrawlLinkRequest, CrawlerRunResult >( - LinkCrawlerQueue, + queue, { run: withWorkerTracing("crawlerWorker.run", runCrawler), - onComplete: async (job) => { + onComplete: async (job: DequeuedJob) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); @@ -351,7 +364,7 @@ export class CrawlerWorker { .where(eq(bookmarkLinks.id, bookmarkId)); } }, - onError: async (job) => { + onError: async (job: DequeuedJobError) => { workerStatsCounter.labels("crawler", "failed").inc(); if (job.numRetriesLeft == 0) { workerStatsCounter.labels("crawler", "failed_permanent").inc(); @@ -402,8 +415,6 @@ export class CrawlerWorker { }, ); - await loadCookiesFromFile(); - return worker; } } diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts index 11a738d7..e5b5c27e 100644 --- a/apps/workers/workers/importWorker.ts +++ b/apps/workers/workers/importWorker.ts @@ -20,7 +20,7 @@ import { importSessions, importStagingBookmarks, } from "@karakeep/db/schema"; -import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; +import { LowPriorityCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server"; import logger, { throttledLogger } from "@karakeep/shared/logger"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -635,7 +635,7 @@ export class ImportWorker { ), ), ), - LinkCrawlerQueue.stats(), + LowPriorityCrawlerQueue.stats(), OpenAIQueue.stats(), ]); -- cgit v1.2.3-70-g09d2