aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/tests/queue.test.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts97
1 files changed, 96 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 2085d57b..7da3f18e 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -10,6 +10,7 @@ import {
} from "vitest";
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { AdminClient } from "../admin";
import { RestateQueueProvider } from "../index";
@@ -49,7 +50,13 @@ type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
| { type: "stall"; durSec: number }
- | { type: "semaphore-acquire" };
+ | { type: "semaphore-acquire" }
+ | {
+ type: "rate-limit";
+ val: number;
+ delayMs: number;
+ attemptsBeforeSuccess: number;
+ };
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
@@ -62,6 +69,7 @@ describe("Restate Queue Provider", () => {
inFlight: 0,
maxInFlight: 0,
baton: new Baton(),
+ rateLimitAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
@@ -81,6 +89,7 @@ describe("Restate Queue Provider", () => {
testState.inFlight = 0;
testState.maxInFlight = 0;
testState.baton = new Baton();
+ testState.rateLimitAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -133,6 +142,21 @@ describe("Restate Queue Provider", () => {
break;
case "semaphore-acquire":
await testState.baton.acquire();
+ break;
+ case "rate-limit": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.rateLimitAttempts.get(attemptKey) || 0;
+ testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ throw new QueueRetryAfterError(
+ `Rate limited (attempt ${currentAttempts + 1})`,
+ jobData.delayMs,
+ );
+ }
+ return jobData.val;
+ }
}
},
onError: async (job) => {
@@ -517,4 +541,75 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([102, 101, 100]);
}, 60000);
});
+
+ describe("QueueRetryAfterError handling", () => {
+ it("should retry after delay without counting against retry attempts", async () => {
+ const startTime = Date.now();
+
+ // This job will fail with QueueRetryAfterError twice before succeeding
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 42,
+ delayMs: 500, // 500ms delay
+ attemptsBeforeSuccess: 2, // Fail twice, succeed on third try
+ });
+
+ await waitUntilQueueEmpty();
+
+ const duration = Date.now() - startTime;
+
+ // Should have succeeded
+ expect(testState.results).toEqual([42]);
+
+ // Should have been called 3 times (2 rate limit failures + 1 success)
+ expect(testState.rateLimitAttempts.size).toBe(1);
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(3);
+
+ // Should have waited at least 1 second total (2 x 500ms delays)
+ expect(duration).toBeGreaterThanOrEqual(1000);
+
+ // onError should NOT have been called for rate limit retries
+ expect(testState.errors).toEqual([]);
+ }, 60000);
+
+ it("should not exhaust retries when rate limited", async () => {
+ // This job will be rate limited many more times than the retry limit
+ // but should still eventually succeed
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 100,
+ delayMs: 100, // Short delay for faster test
+ attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit)
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Should have succeeded despite being "retried" more than the limit
+ expect(testState.results).toEqual([100]);
+
+ // Should have been called 11 times (10 rate limit failures + 1 success)
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(11);
+
+ // No errors should have been recorded
+ expect(testState.errors).toEqual([]);
+ }, 90000);
+
+ it("should still respect retry limit for non-rate-limit errors", async () => {
+ // Enqueue a regular error job that should fail permanently
+ await queue.enqueue({ type: "err", err: "Regular error" });
+
+ await waitUntilQueueEmpty();
+
+ // Should have failed 4 times (initial + 3 retries) and not succeeded
+ expect(testState.errors).toEqual([
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ ]);
+ expect(testState.results).toEqual([]);
+ }, 90000);
+ });
});