diff options
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 97 |
1 files changed, 96 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..7da3f18e 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -10,6 +10,7 @@ import { } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; @@ -49,7 +50,13 @@ type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } - | { type: "semaphore-acquire" }; + | { type: "semaphore-acquire" } + | { + type: "rate-limit"; + val: number; + delayMs: number; + attemptsBeforeSuccess: number; + }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => { inFlight: 0, maxInFlight: 0, baton: new Baton(), + rateLimitAttempts: new Map<string, number>(), }; async function waitUntilQueueEmpty() { @@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => { testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); + testState.rateLimitAttempts = new Map<string, number>(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => { break; case "semaphore-acquire": await testState.baton.acquire(); + break; + case "rate-limit": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.rateLimitAttempts.get(attemptKey) || 0; + testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + throw new QueueRetryAfterError( + `Rate limited (attempt ${currentAttempts + 1})`, + jobData.delayMs, + ); + } + return jobData.val; + } } }, onError: async (job) => { @@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("QueueRetryAfterError handling", () => { + it("should retry after delay without counting against retry attempts", async () => { + const startTime = Date.now(); + + // This job will fail with QueueRetryAfterError twice before succeeding + await queue.enqueue({ + type: "rate-limit", + val: 42, + delayMs: 500, // 500ms delay + attemptsBeforeSuccess: 2, // Fail twice, succeed on third try + }); + + await waitUntilQueueEmpty(); + + const duration = Date.now() - startTime; + + // Should have succeeded + expect(testState.results).toEqual([42]); + + // Should have been called 3 times (2 rate limit failures + 1 success) + expect(testState.rateLimitAttempts.size).toBe(1); + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(3); + + // Should have waited at least 1 second total (2 x 500ms delays) + expect(duration).toBeGreaterThanOrEqual(1000); + + // onError should NOT have been called for rate limit retries + expect(testState.errors).toEqual([]); + }, 60000); + + it("should not exhaust retries when rate limited", async () => { + // This job will be rate limited many more times than the retry limit + // but should still eventually succeed + await queue.enqueue({ + type: "rate-limit", + val: 100, + delayMs: 100, // Short delay for faster test + attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) + }); + + await waitUntilQueueEmpty(); + + // Should have succeeded despite being "retried" more than the limit + expect(testState.results).toEqual([100]); + + // Should have been called 11 times (10 rate limit failures + 1 success) + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(11); + + // No errors should have been recorded + expect(testState.errors).toEqual([]); + }, 90000); + + it("should still respect retry limit for non-rate-limit errors", async () => { + // Enqueue a regular error job that should fail permanently + await queue.enqueue({ type: "err", err: "Regular error" }); + + await waitUntilQueueEmpty(); + + // Should have failed 4 times (initial + 3 retries) and not succeeded + expect(testState.errors).toEqual([ + "Regular error", + "Regular error", + "Regular error", + "Regular error", + ]); + expect(testState.results).toEqual([]); + }, 90000); + }); }); |
