diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-12-25 14:46:45 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-25 12:46:45 +0000 |
| commit | ddd4b578cd193612f3b6927524d4a30e65fd4e44 (patch) | |
| tree | 59fec58ccf2686282d94c406abe3108c3c4e8d47 /packages/plugins/queue-restate/src/tests/queue.test.ts | |
| parent | ef27670f5c027be87d279b9b32553e980e55d888 (diff) | |
| download | karakeep-ddd4b578cd193612f3b6927524d4a30e65fd4e44.tar.zst | |
fix: preserve failure count when rescheduling rate limited domains (#2303)
* fix: preserve retry count when rate-limited jobs are rescheduled
Previously, when a domain was rate-limited in the crawler worker,
the job would be re-enqueued as a new job, which reset the failure
count. This meant rate-limited jobs could retry indefinitely without
respecting the max retry limit.
This commit introduces a RateLimitRetryError exception that signals
the queue system to retry the job after a delay without counting it
as a failed attempt. The job is retried within the same invocation,
preserving the original retry count.
Changes:
- Add RateLimitRetryError class to shared/queueing.ts
- Update crawler worker to throw RateLimitRetryError instead of re-enqueuing
- Update Restate queue service to handle RateLimitRetryError with delay
- Update Liteque queue wrapper to handle RateLimitRetryError with delay
This ensures that rate-limited jobs respect the configured retry limits
while still allowing for delayed retries when domains are rate-limited.
* refactor: use liteque's native RetryAfterError for rate limiting
Instead of manually handling retries in a while loop, translate
RateLimitRetryError to liteque's native RetryAfterError. This is
cleaner and lets liteque handle the retry logic using its built-in
mechanism.
* test: add tests for RateLimitRetryError handling in restate queue
Added comprehensive tests to verify that:
1. RateLimitRetryError delays retry appropriately
2. Rate-limited retries don't count against the retry limit
3. Jobs can be rate-limited more times than the retry limit
4. Regular errors still respect the retry limit
These tests ensure the queue correctly handles rate limiting
without exhausting retry attempts.
* lint & format
* fix: prevent onError callback for RateLimitRetryError
Fixed two issues with RateLimitRetryError handling in restate queue:
1. RateLimitRetryError now doesn't trigger the onError callback since
it's not a real error - it's an expected rate limiting behavior
2. Check for RateLimitRetryError in runWorkerLogic before calling onError,
ensuring the instanceof check works correctly before the error gets
further wrapped by restate
Updated tests to verify onError is not called for rate limit retries.
* fix: catch RateLimitRetryError before ctx.run wraps it
Changed approach to use a discriminated union instead of throwing
and catching RateLimitRetryError. Now we catch the error inside the
ctx.run callback before it gets wrapped by restate's TerminalError,
and return a RunResult type that indicates success, rate limit, or error.
This fixes the issue where instanceof checks would fail because
ctx.run wraps all errors in TerminalError.
* more fixes
* rename error name
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 97 |
1 files changed, 96 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..7da3f18e 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -10,6 +10,7 @@ import { } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; @@ -49,7 +50,13 @@ type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } - | { type: "semaphore-acquire" }; + | { type: "semaphore-acquire" } + | { + type: "rate-limit"; + val: number; + delayMs: number; + attemptsBeforeSuccess: number; + }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => { inFlight: 0, maxInFlight: 0, baton: new Baton(), + rateLimitAttempts: new Map<string, number>(), }; async function waitUntilQueueEmpty() { @@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => { testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); + testState.rateLimitAttempts = new Map<string, number>(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => { break; case "semaphore-acquire": await testState.baton.acquire(); + break; + case "rate-limit": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.rateLimitAttempts.get(attemptKey) || 0; + testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + throw new QueueRetryAfterError( + `Rate limited (attempt ${currentAttempts + 1})`, + jobData.delayMs, + ); + } + return jobData.val; + } } }, onError: async (job) => { @@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("QueueRetryAfterError handling", () => { + it("should retry after delay without counting against retry attempts", async () => { + const startTime = Date.now(); + + // This job will fail with QueueRetryAfterError twice before succeeding + await queue.enqueue({ + type: "rate-limit", + val: 42, + delayMs: 500, // 500ms delay + attemptsBeforeSuccess: 2, // Fail twice, succeed on third try + }); + + await waitUntilQueueEmpty(); + + const duration = Date.now() - startTime; + + // Should have succeeded + expect(testState.results).toEqual([42]); + + // Should have been called 3 times (2 rate limit failures + 1 success) + expect(testState.rateLimitAttempts.size).toBe(1); + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(3); + + // Should have waited at least 1 second total (2 x 500ms delays) + expect(duration).toBeGreaterThanOrEqual(1000); + + // onError should NOT have been called for rate limit retries + expect(testState.errors).toEqual([]); + }, 60000); + + it("should not exhaust retries when rate limited", async () => { + // This job will be rate limited many more times than the retry limit + // but should still eventually succeed + await queue.enqueue({ + type: "rate-limit", + val: 100, + delayMs: 100, // Short delay for faster test + attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) + }); + + await waitUntilQueueEmpty(); + + // Should have succeeded despite being "retried" more than the limit + expect(testState.results).toEqual([100]); + + // Should have been called 11 times (10 rate limit failures + 1 success) + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(11); + + // No errors should have been recorded + expect(testState.errors).toEqual([]); + }, 90000); + + it("should still respect retry limit for non-rate-limit errors", async () => { + // Enqueue a regular error job that should fail permanently + await queue.enqueue({ type: "err", err: "Regular error" }); + + await waitUntilQueueEmpty(); + + // Should have failed 4 times (initial + 3 retries) and not succeeded + expect(testState.errors).toEqual([ + "Regular error", + "Regular error", + "Regular error", + "Regular error", + ]); + expect(testState.results).toEqual([]); + }, 90000); + }); }); |
