diff options
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 9 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 13 |
2 files changed, 21 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..fb7e08c2 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -121,7 +121,14 @@ async function runWorkerLogic<T, R>( ctx.run( `main logic`, async () => { - return await run(data); + return await Promise.race([ + run(data), + new Promise<R>((_, reject) => { + data.abortSignal.addEventListener("abort", () => { + reject(new Error(`Job didn't complete within timeout`)); + }); + }), + ]); }, { maxRetryAttempts: 1, diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..763671d5 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -517,4 +517,17 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("inactivity timeout", () => { + it("should timeout jobs that don't complete", async () => { + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "val", val: 1 }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([1]); + }); + }); }); |
