diff options
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 11 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 3 |
2 files changed, 13 insertions, 1 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 1aef037e..152604dc 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -1,5 +1,6 @@ // Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts +import * as restate from "@restatedev/restate-sdk"; import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; interface QueueItem { @@ -100,7 +101,15 @@ export class RestateSemaphore { priority, capacity: this.capacity, }); - await awk.promise; + + try { + await awk.promise; + } catch (e) { + if (e instanceof restate.CancelledError) { + await this.release(); + throw e; + } + } } async release() { await this.ctx diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 8618b8ab..06ed97f5 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -75,6 +75,9 @@ export function buildRestateService<T, R>( }); await semaphore.release(); if (res.error) { + if (res.error instanceof restate.CancelledError) { + throw res.error; + } lastError = res.error; // TODO: add backoff await ctx.sleep(1000); |
