From 6db14ac492cd5d9e26d0d986513771f14faa7fd0 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Mon, 15 Dec 2025 00:02:24 +0000 Subject: fix: fix restate service to return control to restate service on timeout --- packages/plugins/queue-restate/src/service.ts | 9 ++++++++- packages/plugins/queue-restate/src/tests/queue.test.ts | 13 +++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..fb7e08c2 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -121,7 +121,14 @@ async function runWorkerLogic( ctx.run( `main logic`, async () => { - return await run(data); + return await Promise.race([ + run(data), + new Promise((_, reject) => { + data.abortSignal.addEventListener("abort", () => { + reject(new Error(`Job didn't complete within timeout`)); + }); + }), + ]); }, { maxRetryAttempts: 1, diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..763671d5 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -517,4 +517,17 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("inactivity timeout", () => { + it("should timeout jobs that don't complete", async () => { + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "val", val: 1 }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([1]); + }); + }); }); -- cgit v1.2.3-70-g09d2