aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-12-15 00:02:24 +0000
committerMohamed Bassem <me@mbassem.com>2025-12-15 00:02:24 +0000
commit6db14ac492cd5d9e26d0d986513771f14faa7fd0 (patch)
tree341c22a8672282c12a3c0a45d6f1b18c098caf86
parentd7357118b3f60e4bcce0e5f76629fb6d32acd311 (diff)
downloadkarakeep-6db14ac492cd5d9e26d0d986513771f14faa7fd0.tar.zst
fix: fix restate service to return control to restate service on timeout
-rw-r--r--packages/plugins/queue-restate/src/service.ts9
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts13
2 files changed, 21 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 5ba7d1df..fb7e08c2 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -121,7 +121,14 @@ async function runWorkerLogic<T, R>(
ctx.run(
`main logic`,
async () => {
- return await run(data);
+ return await Promise.race([
+ run(data),
+ new Promise<R>((_, reject) => {
+ data.abortSignal.addEventListener("abort", () => {
+ reject(new Error(`Job didn't complete within timeout`));
+ });
+ }),
+ ]);
},
{
maxRetryAttempts: 1,
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 2085d57b..763671d5 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -517,4 +517,17 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([102, 101, 100]);
}, 60000);
});
+
+ describe("inactivity timeout", () => {
+ it("should timeout jobs that don't complete", async () => {
+ await queue.enqueue({ type: "stall", durSec: 6 });
+ await queue.enqueue({ type: "stall", durSec: 6 });
+ await queue.enqueue({ type: "stall", durSec: 6 });
+ await queue.enqueue({ type: "val", val: 1 });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([1]);
+ });
+ });
});