diff options
Diffstat (limited to '')
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 207 |
1 files changed, 136 insertions, 71 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 694c77b3..72fa938a 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -7,17 +7,20 @@ interface QueueItem { awakeable: string; idempotencyKey?: string; priority: number; + leaseDurationMs: number; } interface LegacyQueueState { - items: QueueItem[]; itemsv2: Record<string, GroupState>; inFlight: number; + paused: boolean; + leases: Record<string, number>; } interface QueueState { groups: Record<string, GroupState>; - inFlight: number; + paused: boolean; + leases: Record<string, number>; } interface GroupState { @@ -29,65 +32,115 @@ interface GroupState { export const semaphore = object({ name: "Semaphore", handlers: { - acquire: async ( - ctx: ObjectContext<LegacyQueueState>, - req: { - awakeableId: string; - priority: number; - capacity: number; - groupId?: string; - idempotencyKey?: string; + acquire: restate.handlers.object.exclusive( + { + ingressPrivate: true, }, - ): Promise<boolean> => { - const state = await getState(ctx); - - if ( - req.idempotencyKey && - idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey) - ) { - return false; - } + async ( + ctx: ObjectContext<LegacyQueueState>, + req: { + awakeableId: string; + priority: number; + capacity: number; + leaseDurationMs: number; + groupId?: string; + idempotencyKey?: string; + }, + ): Promise<boolean> => { + const state = await getState(ctx); - req.groupId = req.groupId ?? "__ungrouped__"; + if ( + req.idempotencyKey && + idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey) + ) { + return false; + } - if (state.groups[req.groupId] === undefined) { - state.groups[req.groupId] = { - id: req.groupId, - items: [], - lastServedTimestamp: Date.now(), - }; - } + req.groupId = req.groupId ?? "__ungrouped__"; - state.groups[req.groupId].items.push({ - awakeable: req.awakeableId, - priority: req.priority, - idempotencyKey: req.idempotencyKey, - }); + if (state.groups[req.groupId] === undefined) { + state.groups[req.groupId] = { + id: req.groupId, + items: [], + lastServedTimestamp: await ctx.date.now(), + }; + } - tick(ctx, state, req.capacity); + state.groups[req.groupId].items.push({ + awakeable: req.awakeableId, + priority: req.priority, + idempotencyKey: req.idempotencyKey, + leaseDurationMs: req.leaseDurationMs, + }); - setState(ctx, state); - return true; - }, - - release: async ( - ctx: ObjectContext<LegacyQueueState>, - capacity: number, - ): Promise<void> => { - const state = await getState(ctx); - state.inFlight--; - tick(ctx, state, capacity); - setState(ctx, state); - }, + await tick(ctx, state, req.capacity); + + setState(ctx, state); + return true; + }, + ), + + release: restate.handlers.object.exclusive( + { + ingressPrivate: true, + }, + async ( + ctx: ObjectContext<LegacyQueueState>, + req: { + leaseId: string; + capacity: number; + }, + ): Promise<void> => { + const state = await getState(ctx); + delete state.leases[req.leaseId]; + await tick(ctx, state, req.capacity); + setState(ctx, state); + }, + ), + pause: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + state.paused = true; + setState(ctx, state); + }, + ), + resume: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + state.paused = false; + await tick(ctx, state, 1); + setState(ctx, state); + }, + ), + resetInflight: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + state.leases = {}; + setState(ctx, state); + }, + ), + tick: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + await tick(ctx, state, 1); + setState(ctx, state); + }, + ), }, options: { - ingressPrivate: true, journalRetention: 0, }, }); // Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(state: QueueState): { +function selectAndPopItem( + state: QueueState, + now: number, +): { item: QueueItem; groupId: string; } { @@ -122,21 +175,37 @@ function selectAndPopItem(state: QueueState): { } const [item] = state.groups[selected.groupId].items.splice(selected.index, 1); - state.groups[selected.groupId].lastServedTimestamp = Date.now(); + state.groups[selected.groupId].lastServedTimestamp = now; if (state.groups[selected.groupId].items.length === 0) { delete state.groups[selected.groupId]; } return { item, groupId: selected.groupId }; } -function tick( +function pruneExpiredLeases(state: QueueState, now: number) { + for (const [leaseId, expiry] of Object.entries(state.leases)) { + if (expiry <= now) { + delete state.leases[leaseId]; + } + } + return Object.keys(state.leases).length; +} + +async function tick( ctx: ObjectContext<LegacyQueueState>, state: QueueState, capacity: number, -) { - while (state.inFlight < capacity && Object.keys(state.groups).length > 0) { - const { item } = selectAndPopItem(state); - state.inFlight++; +): Promise<void> { + let activeLeases = pruneExpiredLeases(state, await ctx.date.now()); + while ( + !state.paused && + activeLeases < capacity && + Object.keys(state.groups).length > 0 + ) { + const now = await ctx.date.now(); + const { item } = selectAndPopItem(state, now); + state.leases[item.awakeable] = now + item.leaseDurationMs; + activeLeases++; ctx.resolveAwakeable(item.awakeable); } } @@ -145,19 +214,13 @@ async function getState( ctx: ObjectContext<LegacyQueueState>, ): Promise<QueueState> { const groups = (await ctx.get("itemsv2")) ?? {}; - const items = (await ctx.get("items")) ?? []; - - if (items.length > 0) { - groups["__legacy__"] = { - id: "__legacy__", - items, - lastServedTimestamp: 0, - }; - } + const paused = (await ctx.get("paused")) ?? false; + const leases = (await ctx.get("leases")) ?? {}; return { groups, - inFlight: (await ctx.get("inFlight")) ?? 0, + paused, + leases, }; } @@ -175,8 +238,8 @@ function idempotencyKeyAlreadyExists( function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) { ctx.set("itemsv2", state.groups); - ctx.set("inFlight", state.inFlight); - ctx.clear("items"); + ctx.set("leases", state.leases); + ctx.set("paused", state.paused); } export class RestateSemaphore { @@ -184,6 +247,7 @@ export class RestateSemaphore { private readonly ctx: Context, private readonly id: string, private readonly capacity: number, + private readonly leaseDurationMs: number, ) {} async acquire(priority: number, groupId?: string, idempotencyKey?: string) { @@ -194,6 +258,7 @@ export class RestateSemaphore { awakeableId: awk.id, priority, capacity: this.capacity, + leaseDurationMs: this.leaseDurationMs, groupId, idempotencyKey, }); @@ -206,15 +271,15 @@ export class RestateSemaphore { await awk.promise; } catch (e) { if (e instanceof restate.CancelledError) { - await this.release(); - throw e; + await this.release(awk.id); } + throw e; } - return true; + return awk.id; } - async release() { + async release(leaseId: string) { await this.ctx .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .release(this.capacity); + .release({ leaseId, capacity: this.capacity }); } } |
