aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/semaphore.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins/queue-restate/src/semaphore.ts')
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts113
1 files changed, 92 insertions, 21 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 152604dc..9acecf28 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -8,21 +8,47 @@ interface QueueItem {
priority: number;
}
-interface QueueState {
+interface LegacyQueueState {
items: QueueItem[];
+ itemsv2: Record<string, GroupState>;
+ inFlight: number;
+}
+
+interface QueueState {
+ groups: Record<string, GroupState>;
inFlight: number;
}
+interface GroupState {
+ id: string;
+ items: QueueItem[];
+ lastServedTimestamp: number;
+}
+
export const semaphore = object({
name: "Semaphore",
handlers: {
acquire: async (
- ctx: ObjectContext<QueueState>,
- req: { awakeableId: string; priority: number; capacity: number },
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ awakeableId: string;
+ priority: number;
+ capacity: number;
+ groupId?: string;
+ },
): Promise<void> => {
const state = await getState(ctx);
+ req.groupId = req.groupId ?? "__ungrouped__";
+
+ if (state.groups[req.groupId] === undefined) {
+ state.groups[req.groupId] = {
+ id: req.groupId,
+ items: [],
+ lastServedTimestamp: Date.now(),
+ };
+ }
- state.items.push({
+ state.groups[req.groupId].items.push({
awakeable: req.awakeableId,
priority: req.priority,
});
@@ -33,7 +59,7 @@ export const semaphore = object({
},
release: async (
- ctx: ObjectContext<QueueState>,
+ ctx: ObjectContext<LegacyQueueState>,
capacity: number,
): Promise<void> => {
const state = await getState(ctx);
@@ -49,40 +75,84 @@ export const semaphore = object({
});
// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(items: QueueItem[]): QueueItem {
- let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 };
- for (const [i, item] of items.entries()) {
- if (item.priority < selected.priority) {
- selected.priority = item.priority;
- selected.index = i;
+function selectAndPopItem(state: QueueState): {
+ item: QueueItem;
+ groupId: string;
+} {
+ let selected: {
+ priority: number;
+ groupId: string;
+ index: number;
+ groupLastServedTimestamp: number;
+ } = {
+ priority: Number.MAX_SAFE_INTEGER,
+ groupId: "",
+ index: 0,
+ groupLastServedTimestamp: 0,
+ };
+
+ for (const [groupId, group] of Object.entries(state.groups)) {
+ for (const [i, item] of group.items.entries()) {
+ if (item.priority < selected.priority) {
+ selected.priority = item.priority;
+ selected.groupId = groupId;
+ selected.index = i;
+ selected.groupLastServedTimestamp = group.lastServedTimestamp;
+ } else if (item.priority === selected.priority) {
+ if (group.lastServedTimestamp < selected.groupLastServedTimestamp) {
+ selected.priority = item.priority;
+ selected.groupId = groupId;
+ selected.index = i;
+ selected.groupLastServedTimestamp = group.lastServedTimestamp;
+ }
+ }
}
}
- const [item] = items.splice(selected.index, 1);
- return item;
+
+ const [item] = state.groups[selected.groupId].items.splice(selected.index, 1);
+ state.groups[selected.groupId].lastServedTimestamp = Date.now();
+ if (state.groups[selected.groupId].items.length === 0) {
+ delete state.groups[selected.groupId];
+ }
+ return { item, groupId: selected.groupId };
}
function tick(
- ctx: ObjectContext<QueueState>,
+ ctx: ObjectContext<LegacyQueueState>,
state: QueueState,
capacity: number,
) {
- while (state.inFlight < capacity && state.items.length > 0) {
- const item = selectAndPopItem(state.items);
+ while (state.inFlight < capacity && Object.keys(state.groups).length > 0) {
+ const { item } = selectAndPopItem(state);
state.inFlight++;
ctx.resolveAwakeable(item.awakeable);
}
}
-async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
+async function getState(
+ ctx: ObjectContext<LegacyQueueState>,
+): Promise<QueueState> {
+ const groups = (await ctx.get("itemsv2")) ?? {};
+ const items = (await ctx.get("items")) ?? [];
+
+ if (items.length > 0) {
+ groups["__legacy__"] = {
+ id: "__legacy__",
+ items,
+ lastServedTimestamp: 0,
+ };
+ }
+
return {
- items: (await ctx.get("items")) ?? [],
+ groups,
inFlight: (await ctx.get("inFlight")) ?? 0,
};
}
-function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
- ctx.set("items", state.items);
+function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
+ ctx.set("itemsv2", state.groups);
ctx.set("inFlight", state.inFlight);
+ ctx.clear("items");
}
export class RestateSemaphore {
@@ -92,7 +162,7 @@ export class RestateSemaphore {
private readonly capacity: number,
) {}
- async acquire(priority: number) {
+ async acquire(priority: number, groupId?: string) {
const awk = this.ctx.awakeable();
await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
@@ -100,6 +170,7 @@ export class RestateSemaphore {
awakeableId: awk.id,
priority,
capacity: this.capacity,
+ groupId,
});
try {