diff options
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 51 | ||||
| -rw-r--r-- | packages/plugins/queue-liteque/src/index.ts | 20 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 49 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 97 | ||||
| -rw-r--r-- | packages/shared/queueing.ts | 15 |
5 files changed, 184 insertions, 48 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index aedf4aa0..3591474e 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -77,6 +77,7 @@ import { DequeuedJob, EnqueueOptions, getQueueClient, + QueueRetryAfterError, } from "@karakeep/shared/queueing"; import { getRateLimitClient } from "@karakeep/shared/ratelimiting"; import { tryCatch } from "@karakeep/shared/tryCatch"; @@ -187,7 +188,7 @@ const cookieSchema = z.object({ const cookiesSchema = z.array(cookieSchema); interface CrawlerRunResult { - status: "completed" | "rescheduled"; + status: "completed"; } function getPlaywrightProxyConfig(): BrowserContextOptions["proxy"] { @@ -325,13 +326,7 @@ export class CrawlerWorker { LinkCrawlerQueue, { run: runCrawler, - onComplete: async (job, result) => { - if (result.status === "rescheduled") { - logger.info( - `[Crawler][${job.id}] Rescheduled due to domain rate limiting`, - ); - return; - } + onComplete: async (job) => { workerStatsCounter.labels("crawler", "completed").inc(); const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); @@ -1308,24 +1303,18 @@ async function crawlAndParseUrl( } /** - * Checks if the domain should be rate limited and reschedules the job if needed. - * @returns true if the job should continue, false if it was rescheduled + * Checks if the domain should be rate limited and throws QueueRetryAfterError if needed. + * @throws {QueueRetryAfterError} if the domain is rate limited */ -async function checkDomainRateLimit( - url: string, - jobId: string, - jobData: ZCrawlLinkRequest, - userId: string, - jobPriority?: number, -): Promise<boolean> { +async function checkDomainRateLimit(url: string, jobId: string): Promise<void> { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; if (!crawlerDomainRateLimitConfig) { - return true; + return; } const rateLimitClient = await getRateLimitClient(); if (!rateLimitClient) { - return true; + return; } const hostname = new URL(url).hostname; @@ -1344,17 +1333,13 @@ async function checkDomainRateLimit( const jitterFactor = 1.0 + Math.random() * 0.4; // Random value between 1.0 and 1.4 const delayMs = Math.floor(resetInSeconds * 1000 * jitterFactor); logger.info( - `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Rescheduling in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, + `[Crawler][${jobId}] Domain "${hostname}" is rate limited. Will retry in ${(delayMs / 1000).toFixed(2)} seconds (with jitter).`, ); - await LinkCrawlerQueue.enqueue(jobData, { - priority: jobPriority, + throw new QueueRetryAfterError( + `Domain "${hostname}" is rate limited`, delayMs, - groupId: userId, - }); - return false; + ); } - - return true; } async function runCrawler( @@ -1381,17 +1366,7 @@ async function runCrawler( precrawledArchiveAssetId, } = await getBookmarkDetails(bookmarkId); - const shouldContinue = await checkDomainRateLimit( - url, - jobId, - job.data, - userId, - job.priority, - ); - - if (!shouldContinue) { - return { status: "rescheduled" }; - } + await checkDomainRateLimit(url, jobId); logger.info( `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts index 94fa795f..b809d158 100644 --- a/packages/plugins/queue-liteque/src/index.ts +++ b/packages/plugins/queue-liteque/src/index.ts @@ -4,10 +4,12 @@ import { SqliteQueue as LQ, Runner as LQRunner, migrateDB, + RetryAfterError, } from "liteque"; import type { PluginProvider } from "@karakeep/shared/plugins"; import type { + DequeuedJob, EnqueueOptions, Queue, QueueClient, @@ -17,6 +19,7 @@ import type { RunnerOptions, } from "@karakeep/shared/queueing"; import serverConfig from "@karakeep/shared/config"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; class LitequeQueueWrapper<T> implements Queue<T> { constructor( @@ -91,10 +94,25 @@ class LitequeQueueClient implements QueueClient { throw new Error(`Queue ${name} not found`); } + // Wrap the run function to translate QueueRetryAfterError to liteque's RetryAfterError + const wrappedRun = async (job: DequeuedJob<T>): Promise<R> => { + try { + return await funcs.run(job); + } catch (error) { + if (error instanceof QueueRetryAfterError) { + // Translate to liteque's native RetryAfterError + // This will cause liteque to retry after the delay without counting against attempts + throw new RetryAfterError(error.delayMs); + } + // Re-throw any other errors + throw error; + } + }; + const runner = new LQRunner<T, R>( wrapper._impl, { - run: funcs.run, + run: wrappedRun, onComplete: funcs.onComplete, onError: funcs.onError, }, diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..2b5716ee 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -6,6 +6,7 @@ import type { RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; @@ -65,7 +66,8 @@ export function buildRestateService<T, R>( ); let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + let runNumber = 0; + while (runNumber <= NUM_RETRIES) { const acquired = await semaphore.acquire( priority, data.groupId, @@ -83,14 +85,24 @@ export function buildRestateService<T, R>( abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), }); await semaphore.release(); - if (res.error) { + + if (res.type === "rate_limit") { + // Handle rate limit retries without counting against retry attempts + await ctx.sleep(res.delayMs, "rate limit retry"); + // Don't increment runNumber - retry without counting against attempts + continue; + } + + if (res.type === "error") { if (res.error instanceof restate.CancelledError) { throw res.error; } lastError = res.error; // TODO: add backoff - await ctx.sleep(1000); + await ctx.sleep(1000, "error retry"); + runNumber++; } else { + // Success break; } } @@ -105,6 +117,11 @@ export function buildRestateService<T, R>( }); } +type RunResult<R> = + | { type: "success"; value: R } + | { type: "rate_limit"; delayMs: number } + | { type: "error"; error: Error }; + async function runWorkerLogic<T, R>( ctx: restate.Context, { run, onError, onComplete }: RunnerFuncs<T, R>, @@ -116,18 +133,26 @@ async function runWorkerLogic<T, R>( numRetriesLeft: number; abortSignal: AbortSignal; }, -) { +): Promise<RunResult<R>> { const res = await tryCatch( ctx.run( `main logic`, async () => { - return await run(data); + const res = await tryCatch(run(data)); + if (res.error) { + if (res.error instanceof QueueRetryAfterError) { + return { type: "rate_limit" as const, delayMs: res.error.delayMs }; + } + throw res.error; // Rethrow + } + return { type: "success" as const, value: res.data }; }, { maxRetryAttempts: 1, }, ), ); + if (res.error) { await tryCatch( ctx.run( @@ -142,13 +167,21 @@ async function runWorkerLogic<T, R>( }, ), ); - return res; + return { type: "error", error: res.error }; + } + + const result = res.data; + + if (result.type === "rate_limit") { + // Don't call onError or onComplete for rate limit retries + return result; } + // Success case - call onComplete await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data, res.data), { + ctx.run("onComplete", async () => await onComplete?.(data, result.value), { maxRetryAttempts: 1, }), ); - return res; + return result; } diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..7da3f18e 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -10,6 +10,7 @@ import { } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; @@ -49,7 +50,13 @@ type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } - | { type: "semaphore-acquire" }; + | { type: "semaphore-acquire" } + | { + type: "rate-limit"; + val: number; + delayMs: number; + attemptsBeforeSuccess: number; + }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => { inFlight: 0, maxInFlight: 0, baton: new Baton(), + rateLimitAttempts: new Map<string, number>(), }; async function waitUntilQueueEmpty() { @@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => { testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); + testState.rateLimitAttempts = new Map<string, number>(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => { break; case "semaphore-acquire": await testState.baton.acquire(); + break; + case "rate-limit": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.rateLimitAttempts.get(attemptKey) || 0; + testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + throw new QueueRetryAfterError( + `Rate limited (attempt ${currentAttempts + 1})`, + jobData.delayMs, + ); + } + return jobData.val; + } } }, onError: async (job) => { @@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("QueueRetryAfterError handling", () => { + it("should retry after delay without counting against retry attempts", async () => { + const startTime = Date.now(); + + // This job will fail with QueueRetryAfterError twice before succeeding + await queue.enqueue({ + type: "rate-limit", + val: 42, + delayMs: 500, // 500ms delay + attemptsBeforeSuccess: 2, // Fail twice, succeed on third try + }); + + await waitUntilQueueEmpty(); + + const duration = Date.now() - startTime; + + // Should have succeeded + expect(testState.results).toEqual([42]); + + // Should have been called 3 times (2 rate limit failures + 1 success) + expect(testState.rateLimitAttempts.size).toBe(1); + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(3); + + // Should have waited at least 1 second total (2 x 500ms delays) + expect(duration).toBeGreaterThanOrEqual(1000); + + // onError should NOT have been called for rate limit retries + expect(testState.errors).toEqual([]); + }, 60000); + + it("should not exhaust retries when rate limited", async () => { + // This job will be rate limited many more times than the retry limit + // but should still eventually succeed + await queue.enqueue({ + type: "rate-limit", + val: 100, + delayMs: 100, // Short delay for faster test + attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) + }); + + await waitUntilQueueEmpty(); + + // Should have succeeded despite being "retried" more than the limit + expect(testState.results).toEqual([100]); + + // Should have been called 11 times (10 rate limit failures + 1 success) + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(11); + + // No errors should have been recorded + expect(testState.errors).toEqual([]); + }, 90000); + + it("should still respect retry limit for non-rate-limit errors", async () => { + // Enqueue a regular error job that should fail permanently + await queue.enqueue({ type: "err", err: "Regular error" }); + + await waitUntilQueueEmpty(); + + // Should have failed 4 times (initial + 3 retries) and not succeeded + expect(testState.errors).toEqual([ + "Regular error", + "Regular error", + "Regular error", + "Regular error", + ]); + expect(testState.results).toEqual([]); + }, 90000); + }); }); diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts index 0dd6ed6b..bc2c9cfa 100644 --- a/packages/shared/queueing.ts +++ b/packages/shared/queueing.ts @@ -2,6 +2,21 @@ import { ZodType } from "zod"; import { PluginManager, PluginType } from "./plugins"; +/** + * Special error that indicates a job should be retried after a delay + * without counting against the retry attempts limit. + * Useful for handling rate limiting scenarios. + */ +export class QueueRetryAfterError extends Error { + constructor( + message: string, + public readonly delayMs: number, + ) { + super(message); + this.name = "QueueRetryAfterError"; + } +} + export interface EnqueueOptions { idempotencyKey?: string; priority?: number; |
