aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-09 21:20:54 +0000
committerGitHub <noreply@github.com>2025-11-09 21:20:54 +0000
commit4cf0856e39c4d69037a6c1a4c3a2a7f803b364a7 (patch)
tree30c0381668b56edc56daf6840968d239a8201fc0 /apps
parentb28cd03a4a5f95f429a1429a59319c8a9ac986f8 (diff)
downloadkarakeep-4cf0856e39c4d69037a6c1a4c3a2a7f803b364a7.tar.zst
feat: add crawler domain rate limiting (#2115)
Diffstat (limited to 'apps')
-rw-r--r--apps/workers/workers/crawlerWorker.ts84
1 files changed, 80 insertions, 4 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 5b49b23e..07a74757 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -77,6 +77,7 @@ import {
EnqueueOptions,
getQueueClient,
} from "@karakeep/shared/queueing";
+import { getRateLimitClient } from "@karakeep/shared/ratelimiting";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -170,6 +171,10 @@ const cookieSchema = z.object({
const cookiesSchema = z.array(cookieSchema);
+interface CrawlerRunResult {
+ status: "completed" | "rescheduled";
+}
+
function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] {
const { proxy } = serverConfig;
@@ -298,11 +303,20 @@ export class CrawlerWorker {
}
logger.info("Starting crawler worker ...");
- const worker = (await getQueueClient())!.createRunner<ZCrawlLinkRequest>(
+ const worker = (await getQueueClient())!.createRunner<
+ ZCrawlLinkRequest,
+ CrawlerRunResult
+ >(
LinkCrawlerQueue,
{
run: runCrawler,
- onComplete: async (job) => {
+ onComplete: async (job, result) => {
+ if (result.status === "rescheduled") {
+ logger.info(
+ `[Crawler][${job.id}] Rescheduled due to domain rate limiting`,
+ );
+ return;
+ }
workerStatsCounter.labels("crawler", "completed").inc();
const jobId = job.id;
logger.info(`[Crawler][${jobId}] Completed successfully`);
@@ -1259,7 +1273,57 @@ async function crawlAndParseUrl(
};
}
-async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
+/**
+ * Checks if the domain should be rate limited and reschedules the job if needed.
+ * @returns true if the job should continue, false if it was rescheduled
+ */
+async function checkDomainRateLimit(
+ url: string,
+ jobId: string,
+ jobData: ZCrawlLinkRequest,
+ jobPriority?: number,
+): Promise<boolean> {
+ const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting;
+ if (!crawlerDomainRateLimitConfig) {
+ return true;
+ }
+
+ const rateLimitClient = await getRateLimitClient();
+ if (!rateLimitClient) {
+ return true;
+ }
+
+ const hostname = new URL(url).hostname;
+ const rateLimitResult = rateLimitClient.checkRateLimit(
+ {
+ name: "domain-ratelimit",
+ maxRequests: crawlerDomainRateLimitConfig.maxRequests,
+ windowMs: crawlerDomainRateLimitConfig.windowMs,
+ },
+ hostname,
+ );
+
+ if (!rateLimitResult.allowed) {
+ const resetInSeconds = rateLimitResult.resetInSeconds;
+ // Add jitter to prevent thundering herd: +40% random variation
+ const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4
+ const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor);
+ logger.info(
+ `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`,
+ );
+ await LinkCrawlerQueue.enqueue(jobData, {
+ priority: jobPriority,
+ delayMs,
+ });
+ return false;
+ }
+
+ return true;
+}
+
+async function runCrawler(
+ job: DequeuedJob<ZCrawlLinkRequest>,
+): Promise<CrawlerRunResult> {
const jobId = `${job.id}:${job.runNumber}`;
const request = zCrawlLinkRequestSchema.safeParse(job.data);
@@ -1267,7 +1331,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
logger.error(
`[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`,
);
- return;
+ return { status: "completed" };
}
const { bookmarkId, archiveFullPage } = request.data;
@@ -1281,6 +1345,17 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
precrawledArchiveAssetId,
} = await getBookmarkDetails(bookmarkId);
+ const shouldContinue = await checkDomainRateLimit(
+ url,
+ jobId,
+ job.data,
+ job.priority,
+ );
+
+ if (!shouldContinue) {
+ return { status: "rescheduled" };
+ }
+
logger.info(
`[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`,
);
@@ -1371,4 +1446,5 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
}
+ return { status: "completed" };
}