diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-01-10 15:31:30 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-10 15:31:30 +0000 |
| commit | f48e98e16ae588ee5004531bf9a5aed757ed3786 (patch) | |
| tree | fc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/tests/queue.test.ts | |
| parent | aace8864d7eab5c858a92064b0ac59c122377830 (diff) | |
| download | karakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst | |
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes
* fix: guard meilisearch client init with mutex
* fix: fix propagation of last error in restate
* fix: don't fail invocations when the job fails
* fix: add a timeout around the worker runner logic
* fix: add leases to handle dangling semaphores
* feat: separate dispatchers and runners
* add a test
* fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 7da3f18e..3c4cf254 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -56,6 +56,12 @@ type TestAction = val: number; delayMs: number; attemptsBeforeSuccess: number; + } + | { + type: "timeout-then-succeed"; + val: number; + timeoutDurSec: number; + attemptsBeforeSuccess: number; }; describe("Restate Queue Provider", () => { @@ -70,6 +76,7 @@ describe("Restate Queue Provider", () => { maxInFlight: 0, baton: new Baton(), rateLimitAttempts: new Map<string, number>(), + timeoutAttempts: new Map<string, number>(), }; async function waitUntilQueueEmpty() { @@ -90,6 +97,7 @@ describe("Restate Queue Provider", () => { testState.maxInFlight = 0; testState.baton = new Baton(); testState.rateLimitAttempts = new Map<string, number>(); + testState.timeoutAttempts = new Map<string, number>(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -157,6 +165,22 @@ describe("Restate Queue Provider", () => { } return jobData.val; } + case "timeout-then-succeed": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.timeoutAttempts.get(attemptKey) || 0; + testState.timeoutAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + // Stall longer than the timeout to trigger a timeout + await new Promise((resolve) => + setTimeout(resolve, jobData.timeoutDurSec * 1000), + ); + // This should not be reached if timeout works correctly + throw new Error("Should have timed out"); + } + return jobData.val; + } } }, onError: async (job) => { @@ -612,4 +636,68 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([]); }, 90000); }); + + describe("Timeout handling", () => { + it("should retry timed out jobs and not waste semaphore slots", async () => { + // This test verifies that: + // 1. Jobs that timeout get retried correctly + // 2. Semaphore slots are freed when jobs timeout (via lease expiry) + // 3. Other jobs can still run while a job is being retried + + // Enqueue a job that will timeout on first attempt, succeed on second + // timeoutSecs is 2, so we stall for 5 seconds to ensure timeout + await queue.enqueue({ + type: "timeout-then-succeed", + val: 42, + timeoutDurSec: 5, + attemptsBeforeSuccess: 1, // Timeout once, then succeed + }); + + // Wait a bit for the first attempt to start + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Enqueue more jobs to verify semaphore slots are eventually freed + // With concurrency=3, these should be able to run after the timeout + await queue.enqueue({ type: "val", val: 100 }); + await queue.enqueue({ type: "val", val: 101 }); + await queue.enqueue({ type: "val", val: 102 }); + + await waitUntilQueueEmpty(); + + // The timeout job should have succeeded after retry + expect(testState.results).toContain(42); + + // All other jobs should have completed + expect(testState.results).toContain(100); + expect(testState.results).toContain(101); + expect(testState.results).toContain(102); + + // The timeout job should have been attempted twice + const attempts = Array.from(testState.timeoutAttempts.values())[0]; + expect(attempts).toBe(2); + + // Concurrency should not have exceeded the limit + expect(testState.maxInFlight).toBeLessThanOrEqual(3); + }, 120000); + + it("should handle job that times out multiple times before succeeding", async () => { + // Enqueue a single job that times out twice before succeeding + // This tests that the retry mechanism works correctly for timeouts + await queue.enqueue({ + type: "timeout-then-succeed", + val: 99, + timeoutDurSec: 5, + attemptsBeforeSuccess: 2, // Timeout twice, then succeed + }); + + await waitUntilQueueEmpty(); + + // Job should eventually succeed + expect(testState.results).toEqual([99]); + + // Should have been attempted 3 times (2 timeouts + 1 success) + const attempts = Array.from(testState.timeoutAttempts.values())[0]; + expect(attempts).toBe(3); + }, 180000); + }); }); |
