diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-12-25 14:46:45 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-25 12:46:45 +0000 |
| commit | ddd4b578cd193612f3b6927524d4a30e65fd4e44 (patch) | |
| tree | 59fec58ccf2686282d94c406abe3108c3c4e8d47 /apps | |
| parent | ef27670f5c027be87d279b9b32553e980e55d888 (diff) | |
| download | karakeep-ddd4b578cd193612f3b6927524d4a30e65fd4e44.tar.zst | |
fix: preserve failure count when rescheduling rate limited domains (#2303)
* fix: preserve retry count when rate-limited jobs are rescheduled
Previously, when a domain was rate-limited in the crawler worker,
the job would be re-enqueued as a new job, which reset the failure
count. This meant rate-limited jobs could retry indefinitely without
respecting the max retry limit.
This commit introduces a RateLimitRetryError exception that signals
the queue system to retry the job after a delay without counting it
as a failed attempt. The job is retried within the same invocation,
preserving the original retry count.
Changes:
- Add RateLimitRetryError class to shared/queueing.ts
- Update crawler worker to throw RateLimitRetryError instead of re-enqueuing
- Update Restate queue service to handle RateLimitRetryError with delay
- Update Liteque queue wrapper to handle RateLimitRetryError with delay
This ensures that rate-limited jobs respect the configured retry limits
while still allowing for delayed retries when domains are rate-limited.
* refactor: use liteque's native RetryAfterError for rate limiting
Instead of manually handling retries in a while loop, translate
RateLimitRetryError to liteque's native RetryAfterError. This is
cleaner and lets liteque handle the retry logic using its built-in
mechanism.
* test: add tests for RateLimitRetryError handling in restate queue
Added comprehensive tests to verify that:
1. RateLimitRetryError delays retry appropriately
2. Rate-limited retries don't count against the retry limit
3. Jobs can be rate-limited more times than the retry limit
4. Regular errors still respect the retry limit
These tests ensure the queue correctly handles rate limiting
without exhausting retry attempts.
* lint & format
* fix: prevent onError callback for RateLimitRetryError
Fixed two issues with RateLimitRetryError handling in restate queue:
1. RateLimitRetryError now doesn't trigger the onError callback since
it's not a real error - it's an expected rate limiting behavior
2. Check for RateLimitRetryError in runWorkerLogic before calling onError,
ensuring the instanceof check works correctly before the error gets
further wrapped by restate
Updated tests to verify onError is not called for rate limit retries.
* fix: catch RateLimitRetryError before ctx.run wraps it
Changed approach to use a discriminated union instead of throwing
and catching RateLimitRetryError. Now we catch the error inside the
ctx.run callback before it gets wrapped by restate's TerminalError,
and return a RunResult type that indicates success, rate limit, or error.
This fixes the issue where instanceof checks would fail because
ctx.run wraps all errors in TerminalError.
* more fixes
* rename error name
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 51 |
1 files changed, 13 insertions, 38 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index aedf4aa0..3591474e 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -77,6 +77,7 @@ import { DequeuedJob, EnqueueOptions, getQueueClient, + QueueRetryAfterError, } from "@karakeep/shared/queueing"; import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; import { tryCatch } from "@karakeep/shared/tryCatch"; @@ -187,7 +188,7 @@ const cookieSchema = z.object({ const cookiesSchema = z.array(cookieSchema); interface CrawlerRunResult { - status: "completed" | "rescheduled"; + status: "completed"; } function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] { @@ -325,13 +326,7 @@ export class CrawlerWorker { LinkCrawlerQueue, { run: runCrawler, - onComplete: async (job, result) => { - if (result.status === "rescheduled") { - logger.info( - `[Crawler][${job.id}] Rescheduled due to domain rate limiting`, - ); - return; - } + onComplete: async (job) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); @@ -1308,24 +1303,18 @@ async function crawlAndParseUrl( } /** - * Checks if the domain should be rate limited and reschedules the job if needed. - * @returns true if the job should continue, false if it was rescheduled + * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed. + * @throws {QueueRetryAfterError} if the domain is rate limited */ -async function checkDomainRateLimit( - url: string, - jobId: string, - jobData: ZCrawlLinkRequest, - userId: string, - jobPriority?: number, -): Promise<boolean> { +async function checkDomainRateLimit(url: string, jobId: string): Promise<void> { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; if (!crawlerDomainRateLimitConfig) { - return true; + return; } const rateLimitClient = await getRateLimitClient(); if (!rateLimitClient) { - return true; + return; } const hostname = new URL(url).hostname; @@ -1344,17 +1333,13 @@ async function checkDomainRateLimit( const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); logger.info( - `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, + `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, ); - await LinkCrawlerQueue.enqueue(jobData, { - priority: jobPriority, + throw new QueueRetryAfterError( + `Domain "${hostname}" is rate limited`, delayMs, - groupId: userId, - }); - return false; + ); } - - return true; } async function runCrawler( @@ -1381,17 +1366,7 @@ async function runCrawler( precrawledArchiveAssetId, } = await getBookmarkDetails(bookmarkId); - const shouldContinue = await checkDomainRateLimit( - url, - jobId, - job.data, - userId, - job.priority, - ); - - if (!shouldContinue) { - return { status: "rescheduled" }; - } + await checkDomainRateLimit(url, jobId); logger.info( `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, |
