diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-10-05 07:04:29 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-10-05 07:04:29 +0100 |
| commit | 74a1f7b6b600d4cb53352dde7def374c3125721a (patch) | |
| tree | 70b79ebae61456f6ff2cb02a37351fa9817fb342 /packages/plugins-queue-restate/src/semaphore.ts | |
| parent | 4a580d713621f99abb8baabc9b847ce039d44842 (diff) | |
| download | karakeep-74a1f7b6b600d4cb53352dde7def374c3125721a.tar.zst | |
feat: Restate-based queue plugin (#2011)
* WIP: Initial restate integration
* add retry
* add delay + idempotency
* implement concurrency limits
* add admin stats
* add todos
* add id provider
* handle onComplete failures
* add tests
* add pub key and fix logging
* add priorities
* fail call after retries
* more fixes
* fix retries left
* some refactoring
* fix package.json
* upgrade sdk
* some test cleanups
Diffstat (limited to 'packages/plugins-queue-restate/src/semaphore.ts')
| -rw-r--r-- | packages/plugins-queue-restate/src/semaphore.ts | 105 |
1 files changed, 105 insertions, 0 deletions
diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts new file mode 100644 index 00000000..253dbe33 --- /dev/null +++ b/packages/plugins-queue-restate/src/semaphore.ts @@ -0,0 +1,105 @@ +// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts + +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +interface QueueItem { + awakeable: string; + priority: number; +} + +interface QueueState { + items: QueueItem[]; + inFlight: number; +} + +export const semaphore = object({ + name: "Semaphore", + handlers: { + acquire: async ( + ctx: ObjectContext<QueueState>, + req: { awakeableId: string; priority: number; capacity: number }, + ): Promise<void> => { + const state = await getState(ctx); + + state.items.push({ + awakeable: req.awakeableId, + priority: req.priority, + }); + + tick(ctx, state, req.capacity); + + setState(ctx, state); + }, + + release: async ( + ctx: ObjectContext<QueueState>, + capacity: number, + ): Promise<void> => { + const state = await getState(ctx); + state.inFlight--; + tick(ctx, state, capacity); + setState(ctx, state); + }, + }, +}); + +function selectAndPopItem(items: QueueItem[]): QueueItem { + let highest = { priority: Number.MIN_SAFE_INTEGER, index: 0 }; + for (const [i, item] of items.entries()) { + if (item.priority > highest.priority) { + highest.priority = item.priority; + highest.index = i; + } + } + const [item] = items.splice(highest.index, 1); + return item; +} + +function tick( + ctx: ObjectContext<QueueState>, + state: QueueState, + capacity: number, +) { + while (state.inFlight < capacity && state.items.length > 0) { + const item = selectAndPopItem(state.items); + state.inFlight++; + ctx.resolveAwakeable(item.awakeable); + } +} + +async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { + return { + items: (await ctx.get("items")) ?? [], + inFlight: (await ctx.get("inFlight")) ?? 0, + }; +} + +function setState(ctx: ObjectContext<QueueState>, state: QueueState) { + ctx.set("items", state.items); + ctx.set("inFlight", state.inFlight); +} + +export class RestateSemaphore { + constructor( + private readonly ctx: Context, + private readonly id: string, + private readonly capacity: number, + ) {} + + async acquire(priority: number) { + const awk = this.ctx.awakeable(); + await this.ctx + .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) + .acquire({ + awakeableId: awk.id, + priority, + capacity: this.capacity, + }); + await awk.promise; + } + async release() { + await this.ctx + .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) + .release(this.capacity); + } +} |
