diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-12-15 00:02:24 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-12-15 00:02:24 +0000 |
| commit | 6db14ac492cd5d9e26d0d986513771f14faa7fd0 (patch) | |
| tree | 341c22a8672282c12a3c0a45d6f1b18c098caf86 /packages | |
| parent | d7357118b3f60e4bcce0e5f76629fb6d32acd311 (diff) | |
| download | karakeep-6db14ac492cd5d9e26d0d986513771f14faa7fd0.tar.zst | |
fix: fix restate service to return control to restate service on timeout
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 9 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 13 |
2 files changed, 21 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..fb7e08c2 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -121,7 +121,14 @@ async function runWorkerLogic<T, R>( ctx.run( `main logic`, async () => { - return await run(data); + return await Promise.race([ + run(data), + new Promise<R>((_, reject) => { + data.abortSignal.addEventListener("abort", () => { + reject(new Error(`Job didn't complete within timeout`)); + }); + }), + ]); }, { maxRetryAttempts: 1, diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..763671d5 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -517,4 +517,17 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("inactivity timeout", () => { + it("should timeout jobs that don't complete", async () => { + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "stall", durSec: 6 }); + await queue.enqueue({ type: "val", val: 1 }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([1]); + }); + }); }); |
