aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/semaphore.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-24 00:45:29 +0000
committerMohamed Bassem <me@mbassem.com>2025-11-24 00:47:03 +0000
commit38842f77e549fd0946e43a40c65abe0f196c3f04 (patch)
tree52412732dcd5547df33b569b0de3829537c54254 /packages/plugins/queue-restate/src/semaphore.ts
parent6912d0dd4e399bf59a080fd84f118f6185758e3a (diff)
downloadkarakeep-38842f77e549fd0946e43a40c65abe0f196c3f04.tar.zst
fix: support invocation cancellation while awaiting sempahore
Diffstat (limited to 'packages/plugins/queue-restate/src/semaphore.ts')
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts11
1 files changed, 10 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 1aef037e..152604dc 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -1,5 +1,6 @@
// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts
+import * as restate from "@restatedev/restate-sdk";
import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
interface QueueItem {
@@ -100,7 +101,15 @@ export class RestateSemaphore {
priority,
capacity: this.capacity,
});
- await awk.promise;
+
+ try {
+ await awk.promise;
+ } catch (e) {
+ if (e instanceof restate.CancelledError) {
+ await this.release();
+ throw e;
+ }
+ }
}
async release() {
await this.ctx