From 74a1f7b6b600d4cb53352dde7def374c3125721a Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 5 Oct 2025 07:04:29 +0100 Subject: feat: Restate-based queue plugin (#2011) * WIP: Initial restate integration * add retry * add delay + idempotency * implement concurrency limits * add admin stats * add todos * add id provider * handle onComplete failures * add tests * add pub key and fix logging * add priorities * fail call after retries * more fixes * fix retries left * some refactoring * fix package.json * upgrade sdk * some test cleanups --- packages/plugins-queue-restate/src/semaphore.ts | 105 ++++++++++++++++++++++++ 1 file changed, 105 insertions(+) create mode 100644 packages/plugins-queue-restate/src/semaphore.ts (limited to 'packages/plugins-queue-restate/src/semaphore.ts') diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts new file mode 100644 index 00000000..253dbe33 --- /dev/null +++ b/packages/plugins-queue-restate/src/semaphore.ts @@ -0,0 +1,105 @@ +// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts + +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +interface QueueItem { + awakeable: string; + priority: number; +} + +interface QueueState { + items: QueueItem[]; + inFlight: number; +} + +export const semaphore = object({ + name: "Semaphore", + handlers: { + acquire: async ( + ctx: ObjectContext, + req: { awakeableId: string; priority: number; capacity: number }, + ): Promise => { + const state = await getState(ctx); + + state.items.push({ + awakeable: req.awakeableId, + priority: req.priority, + }); + + tick(ctx, state, req.capacity); + + setState(ctx, state); + }, + + release: async ( + ctx: ObjectContext, + capacity: number, + ): Promise => { + const state = await getState(ctx); + state.inFlight--; + tick(ctx, state, capacity); + setState(ctx, state); + }, + }, +}); + +function selectAndPopItem(items: QueueItem[]): QueueItem { + let highest = { priority: Number.MIN_SAFE_INTEGER, index: 0 }; + for (const [i, item] of items.entries()) { + if (item.priority > highest.priority) { + highest.priority = item.priority; + highest.index = i; + } + } + const [item] = items.splice(highest.index, 1); + return item; +} + +function tick( + ctx: ObjectContext, + state: QueueState, + capacity: number, +) { + while (state.inFlight < capacity && state.items.length > 0) { + const item = selectAndPopItem(state.items); + state.inFlight++; + ctx.resolveAwakeable(item.awakeable); + } +} + +async function getState(ctx: ObjectContext): Promise { + return { + items: (await ctx.get("items")) ?? [], + inFlight: (await ctx.get("inFlight")) ?? 0, + }; +} + +function setState(ctx: ObjectContext, state: QueueState) { + ctx.set("items", state.items); + ctx.set("inFlight", state.inFlight); +} + +export class RestateSemaphore { + constructor( + private readonly ctx: Context, + private readonly id: string, + private readonly capacity: number, + ) {} + + async acquire(priority: number) { + const awk = this.ctx.awakeable(); + await this.ctx + .objectClient({ name: "Semaphore" }, this.id) + .acquire({ + awakeableId: awk.id, + priority, + capacity: this.capacity, + }); + await awk.promise; + } + async release() { + await this.ctx + .objectClient({ name: "Semaphore" }, this.id) + .release(this.capacity); + } +} -- cgit v1.2.3-70-g09d2