From ddd4b578cd193612f3b6927524d4a30e65fd4e44 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Thu, 25 Dec 2025 14:46:45 +0200 Subject: fix: preserve failure count when rescheduling rate limited domains (#2303) * fix: preserve retry count when rate-limited jobs are rescheduled Previously, when a domain was rate-limited in the crawler worker, the job would be re-enqueued as a new job, which reset the failure count. This meant rate-limited jobs could retry indefinitely without respecting the max retry limit. This commit introduces a RateLimitRetryError exception that signals the queue system to retry the job after a delay without counting it as a failed attempt. The job is retried within the same invocation, preserving the original retry count. Changes: - Add RateLimitRetryError class to shared/queueing.ts - Update crawler worker to throw RateLimitRetryError instead of re-enqueuing - Update Restate queue service to handle RateLimitRetryError with delay - Update Liteque queue wrapper to handle RateLimitRetryError with delay This ensures that rate-limited jobs respect the configured retry limits while still allowing for delayed retries when domains are rate-limited. * refactor: use liteque's native RetryAfterError for rate limiting Instead of manually handling retries in a while loop, translate RateLimitRetryError to liteque's native RetryAfterError. This is cleaner and lets liteque handle the retry logic using its built-in mechanism. * test: add tests for RateLimitRetryError handling in restate queue Added comprehensive tests to verify that: 1. RateLimitRetryError delays retry appropriately 2. Rate-limited retries don't count against the retry limit 3. Jobs can be rate-limited more times than the retry limit 4. Regular errors still respect the retry limit These tests ensure the queue correctly handles rate limiting without exhausting retry attempts. * lint & format * fix: prevent onError callback for RateLimitRetryError Fixed two issues with RateLimitRetryError handling in restate queue: 1. RateLimitRetryError now doesn't trigger the onError callback since it's not a real error - it's an expected rate limiting behavior 2. Check for RateLimitRetryError in runWorkerLogic before calling onError, ensuring the instanceof check works correctly before the error gets further wrapped by restate Updated tests to verify onError is not called for rate limit retries. * fix: catch RateLimitRetryError before ctx.run wraps it Changed approach to use a discriminated union instead of throwing and catching RateLimitRetryError. Now we catch the error inside the ctx.run callback before it gets wrapped by restate's TerminalError, and return a RunResult type that indicates success, rate limit, or error. This fixes the issue where instanceof checks would fail because ctx.run wraps all errors in TerminalError. * more fixes * rename error name --------- Co-authored-by: Claude --- packages/plugins/queue-restate/src/service.ts | 49 +++++++++-- .../plugins/queue-restate/src/tests/queue.test.ts | 97 +++++++++++++++++++++- 2 files changed, 137 insertions(+), 9 deletions(-) (limited to 'packages/plugins/queue-restate/src') diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..2b5716ee 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -6,6 +6,7 @@ import type { RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { genId } from "./idProvider"; @@ -65,7 +66,8 @@ export function buildRestateService( ); let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + let runNumber = 0; + while (runNumber <= NUM_RETRIES) { const acquired = await semaphore.acquire( priority, data.groupId, @@ -83,14 +85,24 @@ export function buildRestateService( abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), }); await semaphore.release(); - if (res.error) { + + if (res.type === "rate_limit") { + // Handle rate limit retries without counting against retry attempts + await ctx.sleep(res.delayMs, "rate limit retry"); + // Don't increment runNumber - retry without counting against attempts + continue; + } + + if (res.type === "error") { if (res.error instanceof restate.CancelledError) { throw res.error; } lastError = res.error; // TODO: add backoff - await ctx.sleep(1000); + await ctx.sleep(1000, "error retry"); + runNumber++; } else { + // Success break; } } @@ -105,6 +117,11 @@ export function buildRestateService( }); } +type RunResult = + | { type: "success"; value: R } + | { type: "rate_limit"; delayMs: number } + | { type: "error"; error: Error }; + async function runWorkerLogic( ctx: restate.Context, { run, onError, onComplete }: RunnerFuncs, @@ -116,18 +133,26 @@ async function runWorkerLogic( numRetriesLeft: number; abortSignal: AbortSignal; }, -) { +): Promise> { const res = await tryCatch( ctx.run( `main logic`, async () => { - return await run(data); + const res = await tryCatch(run(data)); + if (res.error) { + if (res.error instanceof QueueRetryAfterError) { + return { type: "rate_limit" as const, delayMs: res.error.delayMs }; + } + throw res.error; // Rethrow + } + return { type: "success" as const, value: res.data }; }, { maxRetryAttempts: 1, }, ), ); + if (res.error) { await tryCatch( ctx.run( @@ -142,13 +167,21 @@ async function runWorkerLogic( }, ), ); - return res; + return { type: "error", error: res.error }; + } + + const result = res.data; + + if (result.type === "rate_limit") { + // Don't call onError or onComplete for rate limit retries + return result; } + // Success case - call onComplete await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data, res.data), { + ctx.run("onComplete", async () => await onComplete?.(data, result.value), { maxRetryAttempts: 1, }), ); - return res; + return result; } diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..7da3f18e 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -10,6 +10,7 @@ import { } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; @@ -49,7 +50,13 @@ type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } - | { type: "semaphore-acquire" }; + | { type: "semaphore-acquire" } + | { + type: "rate-limit"; + val: number; + delayMs: number; + attemptsBeforeSuccess: number; + }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => { inFlight: 0, maxInFlight: 0, baton: new Baton(), + rateLimitAttempts: new Map(), }; async function waitUntilQueueEmpty() { @@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => { testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); + testState.rateLimitAttempts = new Map(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => { break; case "semaphore-acquire": await testState.baton.acquire(); + break; + case "rate-limit": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.rateLimitAttempts.get(attemptKey) || 0; + testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + throw new QueueRetryAfterError( + `Rate limited (attempt ${currentAttempts + 1})`, + jobData.delayMs, + ); + } + return jobData.val; + } } }, onError: async (job) => { @@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("QueueRetryAfterError handling", () => { + it("should retry after delay without counting against retry attempts", async () => { + const startTime = Date.now(); + + // This job will fail with QueueRetryAfterError twice before succeeding + await queue.enqueue({ + type: "rate-limit", + val: 42, + delayMs: 500, // 500ms delay + attemptsBeforeSuccess: 2, // Fail twice, succeed on third try + }); + + await waitUntilQueueEmpty(); + + const duration = Date.now() - startTime; + + // Should have succeeded + expect(testState.results).toEqual([42]); + + // Should have been called 3 times (2 rate limit failures + 1 success) + expect(testState.rateLimitAttempts.size).toBe(1); + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(3); + + // Should have waited at least 1 second total (2 x 500ms delays) + expect(duration).toBeGreaterThanOrEqual(1000); + + // onError should NOT have been called for rate limit retries + expect(testState.errors).toEqual([]); + }, 60000); + + it("should not exhaust retries when rate limited", async () => { + // This job will be rate limited many more times than the retry limit + // but should still eventually succeed + await queue.enqueue({ + type: "rate-limit", + val: 100, + delayMs: 100, // Short delay for faster test + attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) + }); + + await waitUntilQueueEmpty(); + + // Should have succeeded despite being "retried" more than the limit + expect(testState.results).toEqual([100]); + + // Should have been called 11 times (10 rate limit failures + 1 success) + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(11); + + // No errors should have been recorded + expect(testState.errors).toEqual([]); + }, 90000); + + it("should still respect retry limit for non-rate-limit errors", async () => { + // Enqueue a regular error job that should fail permanently + await queue.enqueue({ type: "err", err: "Regular error" }); + + await waitUntilQueueEmpty(); + + // Should have failed 4 times (initial + 3 retries) and not succeeded + expect(testState.errors).toEqual([ + "Regular error", + "Regular error", + "Regular error", + "Regular error", + ]); + expect(testState.results).toEqual([]); + }, 90000); + }); }); -- cgit v1.2.3-70-g09d2