From 54268759492df88644e4279fdcc600214f922f43 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Mon, 24 Nov 2025 01:23:06 +0000 Subject: feat: Introduce groupId in restate queue (#2168) * feat: Introduce groupId in restate queue * add group ids to the interface * use last served timestamp --- packages/plugins/queue-restate/src/semaphore.ts | 113 +++++++++++++++++++----- 1 file changed, 92 insertions(+), 21 deletions(-) (limited to 'packages/plugins/queue-restate/src/semaphore.ts') diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 152604dc..9acecf28 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -8,21 +8,47 @@ interface QueueItem { priority: number; } -interface QueueState { +interface LegacyQueueState { items: QueueItem[]; + itemsv2: Record; + inFlight: number; +} + +interface QueueState { + groups: Record; inFlight: number; } +interface GroupState { + id: string; + items: QueueItem[]; + lastServedTimestamp: number; +} + export const semaphore = object({ name: "Semaphore", handlers: { acquire: async ( - ctx: ObjectContext, - req: { awakeableId: string; priority: number; capacity: number }, + ctx: ObjectContext, + req: { + awakeableId: string; + priority: number; + capacity: number; + groupId?: string; + }, ): Promise => { const state = await getState(ctx); + req.groupId = req.groupId ?? "__ungrouped__"; + + if (state.groups[req.groupId] === undefined) { + state.groups[req.groupId] = { + id: req.groupId, + items: [], + lastServedTimestamp: Date.now(), + }; + } - state.items.push({ + state.groups[req.groupId].items.push({ awakeable: req.awakeableId, priority: req.priority, }); @@ -33,7 +59,7 @@ export const semaphore = object({ }, release: async ( - ctx: ObjectContext, + ctx: ObjectContext, capacity: number, ): Promise => { const state = await getState(ctx); @@ -49,40 +75,84 @@ export const semaphore = object({ }); // Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(items: QueueItem[]): QueueItem { - let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; - for (const [i, item] of items.entries()) { - if (item.priority < selected.priority) { - selected.priority = item.priority; - selected.index = i; +function selectAndPopItem(state: QueueState): { + item: QueueItem; + groupId: string; +} { + let selected: { + priority: number; + groupId: string; + index: number; + groupLastServedTimestamp: number; + } = { + priority: Number.MAX_SAFE_INTEGER, + groupId: "", + index: 0, + groupLastServedTimestamp: 0, + }; + + for (const [groupId, group] of Object.entries(state.groups)) { + for (const [i, item] of group.items.entries()) { + if (item.priority < selected.priority) { + selected.priority = item.priority; + selected.groupId = groupId; + selected.index = i; + selected.groupLastServedTimestamp = group.lastServedTimestamp; + } else if (item.priority === selected.priority) { + if (group.lastServedTimestamp < selected.groupLastServedTimestamp) { + selected.priority = item.priority; + selected.groupId = groupId; + selected.index = i; + selected.groupLastServedTimestamp = group.lastServedTimestamp; + } + } } } - const [item] = items.splice(selected.index, 1); - return item; + + const [item] = state.groups[selected.groupId].items.splice(selected.index, 1); + state.groups[selected.groupId].lastServedTimestamp = Date.now(); + if (state.groups[selected.groupId].items.length === 0) { + delete state.groups[selected.groupId]; + } + return { item, groupId: selected.groupId }; } function tick( - ctx: ObjectContext, + ctx: ObjectContext, state: QueueState, capacity: number, ) { - while (state.inFlight < capacity && state.items.length > 0) { - const item = selectAndPopItem(state.items); + while (state.inFlight < capacity && Object.keys(state.groups).length > 0) { + const { item } = selectAndPopItem(state); state.inFlight++; ctx.resolveAwakeable(item.awakeable); } } -async function getState(ctx: ObjectContext): Promise { +async function getState( + ctx: ObjectContext, +): Promise { + const groups = (await ctx.get("itemsv2")) ?? {}; + const items = (await ctx.get("items")) ?? []; + + if (items.length > 0) { + groups["__legacy__"] = { + id: "__legacy__", + items, + lastServedTimestamp: 0, + }; + } + return { - items: (await ctx.get("items")) ?? [], + groups, inFlight: (await ctx.get("inFlight")) ?? 0, }; } -function setState(ctx: ObjectContext, state: QueueState) { - ctx.set("items", state.items); +function setState(ctx: ObjectContext, state: QueueState) { + ctx.set("itemsv2", state.groups); ctx.set("inFlight", state.inFlight); + ctx.clear("items"); } export class RestateSemaphore { @@ -92,7 +162,7 @@ export class RestateSemaphore { private readonly capacity: number, ) {} - async acquire(priority: number) { + async acquire(priority: number, groupId?: string) { const awk = this.ctx.awakeable(); await this.ctx .objectClient({ name: "Semaphore" }, this.id) @@ -100,6 +170,7 @@ export class RestateSemaphore { awakeableId: awk.id, priority, capacity: this.capacity, + groupId, }); try { -- cgit v1.2.3-70-g09d2