From 4cf0856e39c4d69037a6c1a4c3a2a7f803b364a7 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 9 Nov 2025 21:20:54 +0000 Subject: feat: add crawler domain rate limiting (#2115) --- apps/workers/workers/crawlerWorker.ts | 84 +++++++++++++++++++++++++++++++++-- 1 file changed, 80 insertions(+), 4 deletions(-) (limited to 'apps/workers') diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 5b49b23e..07a74757 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -77,6 +77,7 @@ import { EnqueueOptions, getQueueClient, } from "@karakeep/shared/queueing"; +import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -170,6 +171,10 @@ const cookieSchema = z.object({ const cookiesSchema = z.array(cookieSchema); +interface CrawlerRunResult { + status: "completed" | "rescheduled"; +} + function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] { const { proxy } = serverConfig; @@ -298,11 +303,20 @@ export class CrawlerWorker { } logger.info("Starting crawler worker ..."); - const worker = (await getQueueClient())!.createRunner( + const worker = (await getQueueClient())!.createRunner< + ZCrawlLinkRequest, + CrawlerRunResult + >( LinkCrawlerQueue, { run: runCrawler, - onComplete: async (job) => { + onComplete: async (job, result) => { + if (result.status === "rescheduled") { + logger.info( + `[Crawler][${job.id}] Rescheduled due to domain rate limiting`, + ); + return; + } workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); @@ -1259,7 +1273,57 @@ async function crawlAndParseUrl( }; } -async function runCrawler(job: DequeuedJob) { +/** + * Checks if the domain should be rate limited and reschedules the job if needed. + * @returns true if the job should continue, false if it was rescheduled + */ +async function checkDomainRateLimit( + url: string, + jobId: string, + jobData: ZCrawlLinkRequest, + jobPriority?: number, +): Promise { + const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; + if (!crawlerDomainRateLimitConfig) { + return true; + } + + const rateLimitClient = await getRateLimitClient(); + if (!rateLimitClient) { + return true; + } + + const hostname = new URL(url).hostname; + const rateLimitResult = rateLimitClient.checkRateLimit( + { + name: "domain-ratelimit", + maxRequests: crawlerDomainRateLimitConfig.maxRequests, + windowMs: crawlerDomainRateLimitConfig.windowMs, + }, + hostname, + ); + + if (!rateLimitResult.allowed) { + const resetInSeconds = rateLimitResult.resetInSeconds; + // Add jitter to prevent thundering herd: +40% random variation + const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 + const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); + logger.info( + `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, + ); + await LinkCrawlerQueue.enqueue(jobData, { + priority: jobPriority, + delayMs, + }); + return false; + } + + return true; +} + +async function runCrawler( + job: DequeuedJob, +): Promise { const jobId = `${job.id}:${job.runNumber}`; const request = zCrawlLinkRequestSchema.safeParse(job.data); @@ -1267,7 +1331,7 @@ async function runCrawler(job: DequeuedJob) { logger.error( `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, ); - return; + return { status: "completed" }; } const { bookmarkId, archiveFullPage } = request.data; @@ -1281,6 +1345,17 @@ async function runCrawler(job: DequeuedJob) { precrawledArchiveAssetId, } = await getBookmarkDetails(bookmarkId); + const shouldContinue = await checkDomainRateLimit( + url, + jobId, + job.data, + job.priority, + ); + + if (!shouldContinue) { + return { status: "rescheduled" }; + } + logger.info( `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, ); @@ -1371,4 +1446,5 @@ async function runCrawler(job: DequeuedJob) { // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); } + return { status: "completed" }; } -- cgit v1.2.3-70-g09d2