aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/semaphore.ts
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts207
1 files changed, 136 insertions, 71 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 694c77b3..72fa938a 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -7,17 +7,20 @@ interface QueueItem {
awakeable: string;
idempotencyKey?: string;
priority: number;
+ leaseDurationMs: number;
}
interface LegacyQueueState {
- items: QueueItem[];
itemsv2: Record<string, GroupState>;
inFlight: number;
+ paused: boolean;
+ leases: Record<string, number>;
}
interface QueueState {
groups: Record<string, GroupState>;
- inFlight: number;
+ paused: boolean;
+ leases: Record<string, number>;
}
interface GroupState {
@@ -29,65 +32,115 @@ interface GroupState {
export const semaphore = object({
name: "Semaphore",
handlers: {
- acquire: async (
- ctx: ObjectContext<LegacyQueueState>,
- req: {
- awakeableId: string;
- priority: number;
- capacity: number;
- groupId?: string;
- idempotencyKey?: string;
+ acquire: restate.handlers.object.exclusive(
+ {
+ ingressPrivate: true,
},
- ): Promise<boolean> => {
- const state = await getState(ctx);
-
- if (
- req.idempotencyKey &&
- idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
- ) {
- return false;
- }
+ async (
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ awakeableId: string;
+ priority: number;
+ capacity: number;
+ leaseDurationMs: number;
+ groupId?: string;
+ idempotencyKey?: string;
+ },
+ ): Promise<boolean> => {
+ const state = await getState(ctx);
- req.groupId = req.groupId ?? "__ungrouped__";
+ if (
+ req.idempotencyKey &&
+ idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
+ ) {
+ return false;
+ }
- if (state.groups[req.groupId] === undefined) {
- state.groups[req.groupId] = {
- id: req.groupId,
- items: [],
- lastServedTimestamp: Date.now(),
- };
- }
+ req.groupId = req.groupId ?? "__ungrouped__";
- state.groups[req.groupId].items.push({
- awakeable: req.awakeableId,
- priority: req.priority,
- idempotencyKey: req.idempotencyKey,
- });
+ if (state.groups[req.groupId] === undefined) {
+ state.groups[req.groupId] = {
+ id: req.groupId,
+ items: [],
+ lastServedTimestamp: await ctx.date.now(),
+ };
+ }
- tick(ctx, state, req.capacity);
+ state.groups[req.groupId].items.push({
+ awakeable: req.awakeableId,
+ priority: req.priority,
+ idempotencyKey: req.idempotencyKey,
+ leaseDurationMs: req.leaseDurationMs,
+ });
- setState(ctx, state);
- return true;
- },
-
- release: async (
- ctx: ObjectContext<LegacyQueueState>,
- capacity: number,
- ): Promise<void> => {
- const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
- setState(ctx, state);
- },
+ await tick(ctx, state, req.capacity);
+
+ setState(ctx, state);
+ return true;
+ },
+ ),
+
+ release: restate.handlers.object.exclusive(
+ {
+ ingressPrivate: true,
+ },
+ async (
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ leaseId: string;
+ capacity: number;
+ },
+ ): Promise<void> => {
+ const state = await getState(ctx);
+ delete state.leases[req.leaseId];
+ await tick(ctx, state, req.capacity);
+ setState(ctx, state);
+ },
+ ),
+ pause: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.paused = true;
+ setState(ctx, state);
+ },
+ ),
+ resume: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.paused = false;
+ await tick(ctx, state, 1);
+ setState(ctx, state);
+ },
+ ),
+ resetInflight: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.leases = {};
+ setState(ctx, state);
+ },
+ ),
+ tick: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ await tick(ctx, state, 1);
+ setState(ctx, state);
+ },
+ ),
},
options: {
- ingressPrivate: true,
journalRetention: 0,
},
});
// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(state: QueueState): {
+function selectAndPopItem(
+ state: QueueState,
+ now: number,
+): {
item: QueueItem;
groupId: string;
} {
@@ -122,21 +175,37 @@ function selectAndPopItem(state: QueueState): {
}
const [item] = state.groups[selected.groupId].items.splice(selected.index, 1);
- state.groups[selected.groupId].lastServedTimestamp = Date.now();
+ state.groups[selected.groupId].lastServedTimestamp = now;
if (state.groups[selected.groupId].items.length === 0) {
delete state.groups[selected.groupId];
}
return { item, groupId: selected.groupId };
}
-function tick(
+function pruneExpiredLeases(state: QueueState, now: number) {
+ for (const [leaseId, expiry] of Object.entries(state.leases)) {
+ if (expiry <= now) {
+ delete state.leases[leaseId];
+ }
+ }
+ return Object.keys(state.leases).length;
+}
+
+async function tick(
ctx: ObjectContext<LegacyQueueState>,
state: QueueState,
capacity: number,
-) {
- while (state.inFlight < capacity && Object.keys(state.groups).length > 0) {
- const { item } = selectAndPopItem(state);
- state.inFlight++;
+): Promise<void> {
+ let activeLeases = pruneExpiredLeases(state, await ctx.date.now());
+ while (
+ !state.paused &&
+ activeLeases < capacity &&
+ Object.keys(state.groups).length > 0
+ ) {
+ const now = await ctx.date.now();
+ const { item } = selectAndPopItem(state, now);
+ state.leases[item.awakeable] = now + item.leaseDurationMs;
+ activeLeases++;
ctx.resolveAwakeable(item.awakeable);
}
}
@@ -145,19 +214,13 @@ async function getState(
ctx: ObjectContext<LegacyQueueState>,
): Promise<QueueState> {
const groups = (await ctx.get("itemsv2")) ?? {};
- const items = (await ctx.get("items")) ?? [];
-
- if (items.length > 0) {
- groups["__legacy__"] = {
- id: "__legacy__",
- items,
- lastServedTimestamp: 0,
- };
- }
+ const paused = (await ctx.get("paused")) ?? false;
+ const leases = (await ctx.get("leases")) ?? {};
return {
groups,
- inFlight: (await ctx.get("inFlight")) ?? 0,
+ paused,
+ leases,
};
}
@@ -175,8 +238,8 @@ function idempotencyKeyAlreadyExists(
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
- ctx.set("inFlight", state.inFlight);
- ctx.clear("items");
+ ctx.set("leases", state.leases);
+ ctx.set("paused", state.paused);
}
export class RestateSemaphore {
@@ -184,6 +247,7 @@ export class RestateSemaphore {
private readonly ctx: Context,
private readonly id: string,
private readonly capacity: number,
+ private readonly leaseDurationMs: number,
) {}
async acquire(priority: number, groupId?: string, idempotencyKey?: string) {
@@ -194,6 +258,7 @@ export class RestateSemaphore {
awakeableId: awk.id,
priority,
capacity: this.capacity,
+ leaseDurationMs: this.leaseDurationMs,
groupId,
idempotencyKey,
});
@@ -206,15 +271,15 @@ export class RestateSemaphore {
await awk.promise;
} catch (e) {
if (e instanceof restate.CancelledError) {
- await this.release();
- throw e;
+ await this.release(awk.id);
}
+ throw e;
}
- return true;
+ return awk.id;
}
- async release() {
+ async release(leaseId: string) {
await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .release(this.capacity);
+ .release({ leaseId, capacity: this.capacity });
}
}