diff options
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 51 |
1 files changed, 13 insertions, 38 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index aedf4aa0..3591474e 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -77,6 +77,7 @@ import { DequeuedJob, EnqueueOptions, getQueueClient, + QueueRetryAfterError, } from "@karakeep/shared/queueing"; import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; import { tryCatch } from "@karakeep/shared/tryCatch"; @@ -187,7 +188,7 @@ const cookieSchema = z.object({ const cookiesSchema = z.array(cookieSchema); interface CrawlerRunResult { - status: "completed" | "rescheduled"; + status: "completed"; } function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] { @@ -325,13 +326,7 @@ export class CrawlerWorker { LinkCrawlerQueue, { run: runCrawler, - onComplete: async (job, result) => { - if (result.status === "rescheduled") { - logger.info( - `[Crawler][${job.id}] Rescheduled due to domain rate limiting`, - ); - return; - } + onComplete: async (job) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); @@ -1308,24 +1303,18 @@ async function crawlAndParseUrl( } /** - * Checks if the domain should be rate limited and reschedules the job if needed. - * @returns true if the job should continue, false if it was rescheduled + * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed. + * @throws {QueueRetryAfterError} if the domain is rate limited */ -async function checkDomainRateLimit( - url: string, - jobId: string, - jobData: ZCrawlLinkRequest, - userId: string, - jobPriority?: number, -): Promise<boolean> { +async function checkDomainRateLimit(url: string, jobId: string): Promise<void> { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; if (!crawlerDomainRateLimitConfig) { - return true; + return; } const rateLimitClient = await getRateLimitClient(); if (!rateLimitClient) { - return true; + return; } const hostname = new URL(url).hostname; @@ -1344,17 +1333,13 @@ async function checkDomainRateLimit( const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); logger.info( - `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, + `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, ); - await LinkCrawlerQueue.enqueue(jobData, { - priority: jobPriority, + throw new QueueRetryAfterError( + `Domain "${hostname}" is rate limited`, delayMs, - groupId: userId, - }); - return false; + ); } - - return true; } async function runCrawler( @@ -1381,17 +1366,7 @@ async function runCrawler( precrawledArchiveAssetId, } = await getBookmarkDetails(bookmarkId); - const shouldContinue = await checkDomainRateLimit( - url, - jobId, - job.data, - userId, - job.priority, - ); - - if (!shouldContinue) { - return { status: "rescheduled" }; - } + await checkDomainRateLimit(url, jobId); logger.info( `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, |
