aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-08 02:11:51 +0000
committerGitHub <noreply@github.com>2026-02-08 02:11:51 +0000
commitbbd65fd6123f7d1a93d1f6a68f2b933d53ec3c23 (patch)
tree1f23b04b4ca8fbccdea2d61e7c2c0a8b991d87d1
parent7d53e2e458cba7153dea27c625ca1bb534952ddf (diff)
downloadkarakeep-bbd65fd6123f7d1a93d1f6a68f2b933d53ec3c23.tar.zst
feat: Add separate queue for import link crawling (#2452)
* feat: add separate queue for import link crawling --------- Co-authored-by: Claude <noreply@anthropic.com>
-rw-r--r--apps/workers/index.ts7
-rw-r--r--apps/workers/workers/crawlerWorker.ts75
-rw-r--r--apps/workers/workers/importWorker.ts4
-rw-r--r--packages/shared-server/src/queues.ts12
-rw-r--r--packages/trpc/routers/bookmarks.ts8
5 files changed, 70 insertions, 36 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 931a505f..c7b9533d 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -10,6 +10,7 @@ import {
initTracing,
LinkCrawlerQueue,
loadAllPlugins,
+ LowPriorityCrawlerQueue,
OpenAIQueue,
prepareQueue,
RuleEngineQueue,
@@ -38,7 +39,11 @@ import { WebhookWorker } from "./workers/webhookWorker";
const workerBuilders = {
crawler: async () => {
await LinkCrawlerQueue.ensureInit();
- return CrawlerWorker.build();
+ return CrawlerWorker.build(LinkCrawlerQueue);
+ },
+ lowPriorityCrawler: async () => {
+ await LowPriorityCrawlerQueue.ensureInit();
+ return CrawlerWorker.build(LowPriorityCrawlerQueue);
},
inference: async () => {
await OpenAIQueue.ensureInit();
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 48ea5352..5869354f 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -59,7 +59,6 @@ import {
import {
AssetPreprocessingQueue,
getTracer,
- LinkCrawlerQueue,
OpenAIQueue,
QuotaService,
setSpanAttributes,
@@ -84,8 +83,10 @@ import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import {
DequeuedJob,
+ DequeuedJobError,
EnqueueOptions,
getQueueClient,
+ Queue,
QueueRetryAfterError,
} from "@karakeep/shared/queueing";
import { getRateLimitClient } from "@karakeep/shared/ratelimiting";
@@ -302,42 +303,54 @@ async function launchBrowser() {
}
export class CrawlerWorker {
- static async build() {
- chromium.use(StealthPlugin());
- if (serverConfig.crawler.enableAdblocker) {
- logger.info("[crawler] Loading adblocker ...");
- const globalBlockerResult = await tryCatch(
- PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, {
- path: path.join(os.tmpdir(), "karakeep_adblocker.bin"),
- read: fs.readFile,
- write: fs.writeFile,
- }),
- );
- if (globalBlockerResult.error) {
- logger.error(
- `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`,
- );
- } else {
- globalBlocker = globalBlockerResult.data;
- }
- }
- if (!serverConfig.crawler.browserConnectOnDemand) {
- await launchBrowser();
- } else {
- logger.info(
- "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance",
- );
+ private static initPromise: Promise<void> | null = null;
+
+ private static ensureInitialized() {
+ if (!CrawlerWorker.initPromise) {
+ CrawlerWorker.initPromise = (async () => {
+ chromium.use(StealthPlugin());
+ if (serverConfig.crawler.enableAdblocker) {
+ logger.info("[crawler] Loading adblocker ...");
+ const globalBlockerResult = await tryCatch(
+ PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, {
+ path: path.join(os.tmpdir(), "karakeep_adblocker.bin"),
+ read: fs.readFile,
+ write: fs.writeFile,
+ }),
+ );
+ if (globalBlockerResult.error) {
+ logger.error(
+ `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`,
+ );
+ } else {
+ globalBlocker = globalBlockerResult.data;
+ }
+ }
+ if (!serverConfig.crawler.browserConnectOnDemand) {
+ await launchBrowser();
+ } else {
+ logger.info(
+ "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance",
+ );
+ }
+ await loadCookiesFromFile();
+ })();
}
+ return CrawlerWorker.initPromise;
+ }
+
+ static async build(queue: Queue<ZCrawlLinkRequest>) {
+ await CrawlerWorker.ensureInitialized();
logger.info("Starting crawler worker ...");
- const worker = (await getQueueClient())!.createRunner<
+ const worker = (await getQueueClient()).createRunner<
ZCrawlLinkRequest,
CrawlerRunResult
>(
- LinkCrawlerQueue,
+ queue,
{
run: withWorkerTracing("crawlerWorker.run", runCrawler),
- onComplete: async (job) => {
+ onComplete: async (job: DequeuedJob<ZCrawlLinkRequest>) => {
workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
@@ -351,7 +364,7 @@ export class CrawlerWorker {
.where(eq(bookmarkLinks.id, bookmarkId));
}
},
- onError: async (job) => {
+ onError: async (job: DequeuedJobError<ZCrawlLinkRequest>) => {
workerStatsCounter.labels("crawler", "failed").inc();
if (job.numRetriesLeft == 0) {
workerStatsCounter.labels("crawler", "failed_permanent").inc();
@@ -402,8 +415,6 @@ export class CrawlerWorker {
},
);
- await loadCookiesFromFile();
-
return worker;
}
}
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
index 11a738d7..e5b5c27e 100644
--- a/apps/workers/workers/importWorker.ts
+++ b/apps/workers/workers/importWorker.ts
@@ -20,7 +20,7 @@ import {
importSessions,
importStagingBookmarks,
} from "@karakeep/db/schema";
-import { LinkCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server";
+import { LowPriorityCrawlerQueue, OpenAIQueue } from "@karakeep/shared-server";
import logger, { throttledLogger } from "@karakeep/shared/logger";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -635,7 +635,7 @@ export class ImportWorker {
),
),
),
- LinkCrawlerQueue.stats(),
+ LowPriorityCrawlerQueue.stats(),
OpenAIQueue.stats(),
]);
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts
index fd9dac83..4d4a61d6 100644
--- a/packages/shared-server/src/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -96,6 +96,18 @@ export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>(
},
);
+// Separate queue for low priority link crawling (e.g. imports)
+// This prevents low priority crawling from impacting the parallelism of the main queue
+export const LowPriorityCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>(
+ "low_priority_crawler_queue",
+ {
+ defaultJobArgs: {
+ numRetries: 5,
+ },
+ keepFailedJobs: false,
+ },
+);
+
// Inference Worker
export const zOpenAIRequestSchema = z.object({
bookmarkId: z.string(),
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 0bbf4fb7..565558c3 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -19,6 +19,7 @@ import {
import {
AssetPreprocessingQueue,
LinkCrawlerQueue,
+ LowPriorityCrawlerQueue,
OpenAIQueue,
QueuePriority,
QuotaService,
@@ -282,7 +283,12 @@ export const bookmarksAppRouter = router({
switch (bookmark.content.type) {
case BookmarkTypes.LINK: {
// The crawling job triggers openai when it's done
- await LinkCrawlerQueue.enqueue(
+ // Use a separate queue for low priority crawling to avoid impacting main queue parallelism
+ const crawlerQueue =
+ input.crawlPriority === "low"
+ ? LowPriorityCrawlerQueue
+ : LinkCrawlerQueue;
+ await crawlerQueue.enqueue(
{
bookmarkId: bookmark.id,
},