aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/tests/queue.test.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-10 15:31:30 +0000
committerGitHub <noreply@github.com>2026-01-10 15:31:30 +0000
commitf48e98e16ae588ee5004531bf9a5aed757ed3786 (patch)
treefc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/tests/queue.test.ts
parentaace8864d7eab5c858a92064b0ac59c122377830 (diff)
downloadkarakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/tests/queue.test.ts')
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts88
1 files changed, 88 insertions, 0 deletions
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 7da3f18e..3c4cf254 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -56,6 +56,12 @@ type TestAction =
val: number;
delayMs: number;
attemptsBeforeSuccess: number;
+ }
+ | {
+ type: "timeout-then-succeed";
+ val: number;
+ timeoutDurSec: number;
+ attemptsBeforeSuccess: number;
};
describe("Restate Queue Provider", () => {
@@ -70,6 +76,7 @@ describe("Restate Queue Provider", () => {
maxInFlight: 0,
baton: new Baton(),
rateLimitAttempts: new Map<string, number>(),
+ timeoutAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
@@ -90,6 +97,7 @@ describe("Restate Queue Provider", () => {
testState.maxInFlight = 0;
testState.baton = new Baton();
testState.rateLimitAttempts = new Map<string, number>();
+ testState.timeoutAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -157,6 +165,22 @@ describe("Restate Queue Provider", () => {
}
return jobData.val;
}
+ case "timeout-then-succeed": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.timeoutAttempts.get(attemptKey) || 0;
+ testState.timeoutAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ // Stall longer than the timeout to trigger a timeout
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.timeoutDurSec * 1000),
+ );
+ // This should not be reached if timeout works correctly
+ throw new Error("Should have timed out");
+ }
+ return jobData.val;
+ }
}
},
onError: async (job) => {
@@ -612,4 +636,68 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([]);
}, 90000);
});
+
+ describe("Timeout handling", () => {
+ it("should retry timed out jobs and not waste semaphore slots", async () => {
+ // This test verifies that:
+ // 1. Jobs that timeout get retried correctly
+ // 2. Semaphore slots are freed when jobs timeout (via lease expiry)
+ // 3. Other jobs can still run while a job is being retried
+
+ // Enqueue a job that will timeout on first attempt, succeed on second
+ // timeoutSecs is 2, so we stall for 5 seconds to ensure timeout
+ await queue.enqueue({
+ type: "timeout-then-succeed",
+ val: 42,
+ timeoutDurSec: 5,
+ attemptsBeforeSuccess: 1, // Timeout once, then succeed
+ });
+
+ // Wait a bit for the first attempt to start
+ await new Promise((resolve) => setTimeout(resolve, 500));
+
+ // Enqueue more jobs to verify semaphore slots are eventually freed
+ // With concurrency=3, these should be able to run after the timeout
+ await queue.enqueue({ type: "val", val: 100 });
+ await queue.enqueue({ type: "val", val: 101 });
+ await queue.enqueue({ type: "val", val: 102 });
+
+ await waitUntilQueueEmpty();
+
+ // The timeout job should have succeeded after retry
+ expect(testState.results).toContain(42);
+
+ // All other jobs should have completed
+ expect(testState.results).toContain(100);
+ expect(testState.results).toContain(101);
+ expect(testState.results).toContain(102);
+
+ // The timeout job should have been attempted twice
+ const attempts = Array.from(testState.timeoutAttempts.values())[0];
+ expect(attempts).toBe(2);
+
+ // Concurrency should not have exceeded the limit
+ expect(testState.maxInFlight).toBeLessThanOrEqual(3);
+ }, 120000);
+
+ it("should handle job that times out multiple times before succeeding", async () => {
+ // Enqueue a single job that times out twice before succeeding
+ // This tests that the retry mechanism works correctly for timeouts
+ await queue.enqueue({
+ type: "timeout-then-succeed",
+ val: 99,
+ timeoutDurSec: 5,
+ attemptsBeforeSuccess: 2, // Timeout twice, then succeed
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Job should eventually succeed
+ expect(testState.results).toEqual([99]);
+
+ // Should have been attempted 3 times (2 timeouts + 1 success)
+ const attempts = Array.from(testState.timeoutAttempts.values())[0];
+ expect(attempts).toBe(3);
+ }, 180000);
+ });
});