diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-24 01:23:06 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-24 01:23:06 +0000 |
| commit | 54268759492df88644e4279fdcc600214f922f43 (patch) | |
| tree | 610cb784662ca2c7513f67aa7a74f3a8d3e40b3f /packages/plugins/queue-restate/src/semaphore.ts | |
| parent | 38842f77e549fd0946e43a40c65abe0f196c3f04 (diff) | |
| download | karakeep-54268759492df88644e4279fdcc600214f922f43.tar.zst | |
feat: Introduce groupId in restate queue (#2168)
* feat: Introduce groupId in restate queue
* add group ids to the interface
* use last served timestamp
Diffstat (limited to 'packages/plugins/queue-restate/src/semaphore.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 113 |
1 files changed, 92 insertions, 21 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 152604dc..9acecf28 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -8,21 +8,47 @@ interface QueueItem { priority: number; } -interface QueueState { +interface LegacyQueueState { items: QueueItem[]; + itemsv2: Record<string, GroupState>; + inFlight: number; +} + +interface QueueState { + groups: Record<string, GroupState>; inFlight: number; } +interface GroupState { + id: string; + items: QueueItem[]; + lastServedTimestamp: number; +} + export const semaphore = object({ name: "Semaphore", handlers: { acquire: async ( - ctx: ObjectContext<QueueState>, - req: { awakeableId: string; priority: number; capacity: number }, + ctx: ObjectContext<LegacyQueueState>, + req: { + awakeableId: string; + priority: number; + capacity: number; + groupId?: string; + }, ): Promise<void> => { const state = await getState(ctx); + req.groupId = req.groupId ?? "__ungrouped__"; + + if (state.groups[req.groupId] === undefined) { + state.groups[req.groupId] = { + id: req.groupId, + items: [], + lastServedTimestamp: Date.now(), + }; + } - state.items.push({ + state.groups[req.groupId].items.push({ awakeable: req.awakeableId, priority: req.priority, }); @@ -33,7 +59,7 @@ export const semaphore = object({ }, release: async ( - ctx: ObjectContext<QueueState>, + ctx: ObjectContext<LegacyQueueState>, capacity: number, ): Promise<void> => { const state = await getState(ctx); @@ -49,40 +75,84 @@ export const semaphore = object({ }); // Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(items: QueueItem[]): QueueItem { - let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; - for (const [i, item] of items.entries()) { - if (item.priority < selected.priority) { - selected.priority = item.priority; - selected.index = i; +function selectAndPopItem(state: QueueState): { + item: QueueItem; + groupId: string; +} { + let selected: { + priority: number; + groupId: string; + index: number; + groupLastServedTimestamp: number; + } = { + priority: Number.MAX_SAFE_INTEGER, + groupId: "", + index: 0, + groupLastServedTimestamp: 0, + }; + + for (const [groupId, group] of Object.entries(state.groups)) { + for (const [i, item] of group.items.entries()) { + if (item.priority < selected.priority) { + selected.priority = item.priority; + selected.groupId = groupId; + selected.index = i; + selected.groupLastServedTimestamp = group.lastServedTimestamp; + } else if (item.priority === selected.priority) { + if (group.lastServedTimestamp < selected.groupLastServedTimestamp) { + selected.priority = item.priority; + selected.groupId = groupId; + selected.index = i; + selected.groupLastServedTimestamp = group.lastServedTimestamp; + } + } } } - const [item] = items.splice(selected.index, 1); - return item; + + const [item] = state.groups[selected.groupId].items.splice(selected.index, 1); + state.groups[selected.groupId].lastServedTimestamp = Date.now(); + if (state.groups[selected.groupId].items.length === 0) { + delete state.groups[selected.groupId]; + } + return { item, groupId: selected.groupId }; } function tick( - ctx: ObjectContext<QueueState>, + ctx: ObjectContext<LegacyQueueState>, state: QueueState, capacity: number, ) { - while (state.inFlight < capacity && state.items.length > 0) { - const item = selectAndPopItem(state.items); + while (state.inFlight < capacity && Object.keys(state.groups).length > 0) { + const { item } = selectAndPopItem(state); state.inFlight++; ctx.resolveAwakeable(item.awakeable); } } -async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { +async function getState( + ctx: ObjectContext<LegacyQueueState>, +): Promise<QueueState> { + const groups = (await ctx.get("itemsv2")) ?? {}; + const items = (await ctx.get("items")) ?? []; + + if (items.length > 0) { + groups["__legacy__"] = { + id: "__legacy__", + items, + lastServedTimestamp: 0, + }; + } + return { - items: (await ctx.get("items")) ?? [], + groups, inFlight: (await ctx.get("inFlight")) ?? 0, }; } -function setState(ctx: ObjectContext<QueueState>, state: QueueState) { - ctx.set("items", state.items); +function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) { + ctx.set("itemsv2", state.groups); ctx.set("inFlight", state.inFlight); + ctx.clear("items"); } export class RestateSemaphore { @@ -92,7 +162,7 @@ export class RestateSemaphore { private readonly capacity: number, ) {} - async acquire(priority: number) { + async acquire(priority: number, groupId?: string) { const awk = this.ctx.awakeable(); await this.ctx .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) @@ -100,6 +170,7 @@ export class RestateSemaphore { awakeableId: awk.id, priority, capacity: this.capacity, + groupId, }); try { |
