diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-24 00:45:29 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-11-24 00:47:03 +0000 |
| commit | 38842f77e549fd0946e43a40c65abe0f196c3f04 (patch) | |
| tree | 52412732dcd5547df33b569b0de3829537c54254 | |
| parent | 6912d0dd4e399bf59a080fd84f118f6185758e3a (diff) | |
| download | karakeep-38842f77e549fd0946e43a40c65abe0f196c3f04.tar.zst | |
fix: support invocation cancellation while awaiting sempahore
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 11 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 3 |
2 files changed, 13 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 1aef037e..152604dc 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -1,5 +1,6 @@ // Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts +import * as restate from "@restatedev/restate-sdk"; import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; interface QueueItem { @@ -100,7 +101,15 @@ export class RestateSemaphore { priority, capacity: this.capacity, }); - await awk.promise; + + try { + await awk.promise; + } catch (e) { + if (e instanceof restate.CancelledError) { + await this.release(); + throw e; + } + } } async release() { await this.ctx diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 8618b8ab..06ed97f5 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -75,6 +75,9 @@ export function buildRestateService<T, R>( }); await semaphore.release(); if (res.error) { + if (res.error instanceof restate.CancelledError) { + throw res.error; + } lastError = res.error; // TODO: add backoff await ctx.sleep(1000); |
