aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-12-20 13:27:51 +0000
committerMohamed Bassem <me@mbassem.com>2025-12-20 13:27:51 +0000
commit58eb6c0054e87a47457f0390319c0ca91b2714a5 (patch)
tree6ec5be640d09b847855f4a4da98bb4e343da61dc /packages/plugins/queue-restate/src
parent837dea5e1bc2e8daaf1f076948fc3384a07e28db (diff)
downloadkarakeep-58eb6c0054e87a47457f0390319c0ca91b2714a5.tar.zst
feat: add more restate semaphore controls
Diffstat (limited to 'packages/plugins/queue-restate/src')
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts154
1 files changed, 97 insertions, 57 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 694c77b3..451e7251 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -10,14 +10,15 @@ interface QueueItem {
}
interface LegacyQueueState {
- items: QueueItem[];
itemsv2: Record<string, GroupState>;
inFlight: number;
+ paused: boolean;
}
interface QueueState {
groups: Record<string, GroupState>;
inFlight: number;
+ paused: boolean;
}
interface GroupState {
@@ -29,59 +30,101 @@ interface GroupState {
export const semaphore = object({
name: "Semaphore",
handlers: {
- acquire: async (
- ctx: ObjectContext<LegacyQueueState>,
- req: {
- awakeableId: string;
- priority: number;
- capacity: number;
- groupId?: string;
- idempotencyKey?: string;
+ acquire: restate.handlers.object.exclusive(
+ {
+ ingressPrivate: true,
},
- ): Promise<boolean> => {
- const state = await getState(ctx);
-
- if (
- req.idempotencyKey &&
- idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
- ) {
- return false;
- }
+ async (
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ awakeableId: string;
+ priority: number;
+ capacity: number;
+ groupId?: string;
+ idempotencyKey?: string;
+ },
+ ): Promise<boolean> => {
+ const state = await getState(ctx);
- req.groupId = req.groupId ?? "__ungrouped__";
+ if (
+ req.idempotencyKey &&
+ idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
+ ) {
+ return false;
+ }
- if (state.groups[req.groupId] === undefined) {
- state.groups[req.groupId] = {
- id: req.groupId,
- items: [],
- lastServedTimestamp: Date.now(),
- };
- }
+ req.groupId = req.groupId ?? "__ungrouped__";
- state.groups[req.groupId].items.push({
- awakeable: req.awakeableId,
- priority: req.priority,
- idempotencyKey: req.idempotencyKey,
- });
+ if (state.groups[req.groupId] === undefined) {
+ state.groups[req.groupId] = {
+ id: req.groupId,
+ items: [],
+ lastServedTimestamp: Date.now(),
+ };
+ }
- tick(ctx, state, req.capacity);
+ state.groups[req.groupId].items.push({
+ awakeable: req.awakeableId,
+ priority: req.priority,
+ idempotencyKey: req.idempotencyKey,
+ });
- setState(ctx, state);
- return true;
- },
-
- release: async (
- ctx: ObjectContext<LegacyQueueState>,
- capacity: number,
- ): Promise<void> => {
- const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
- setState(ctx, state);
- },
+ tick(ctx, state, req.capacity);
+
+ setState(ctx, state);
+ return true;
+ },
+ ),
+
+ release: restate.handlers.object.exclusive(
+ {
+ ingressPrivate: true,
+ },
+ async (
+ ctx: ObjectContext<LegacyQueueState>,
+ capacity: number,
+ ): Promise<void> => {
+ const state = await getState(ctx);
+ state.inFlight--;
+ tick(ctx, state, capacity);
+ setState(ctx, state);
+ },
+ ),
+ pause: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.paused = true;
+ setState(ctx, state);
+ },
+ ),
+ resume: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.paused = false;
+ tick(ctx, state, 1);
+ setState(ctx, state);
+ },
+ ),
+ resetInflight: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.inFlight = 0;
+ setState(ctx, state);
+ },
+ ),
+ tick: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ tick(ctx, state, 1);
+ setState(ctx, state);
+ },
+ ),
},
options: {
- ingressPrivate: true,
journalRetention: 0,
},
});
@@ -134,7 +177,11 @@ function tick(
state: QueueState,
capacity: number,
) {
- while (state.inFlight < capacity && Object.keys(state.groups).length > 0) {
+ while (
+ !state.paused &&
+ state.inFlight < capacity &&
+ Object.keys(state.groups).length > 0
+ ) {
const { item } = selectAndPopItem(state);
state.inFlight++;
ctx.resolveAwakeable(item.awakeable);
@@ -145,18 +192,11 @@ async function getState(
ctx: ObjectContext<LegacyQueueState>,
): Promise<QueueState> {
const groups = (await ctx.get("itemsv2")) ?? {};
- const items = (await ctx.get("items")) ?? [];
-
- if (items.length > 0) {
- groups["__legacy__"] = {
- id: "__legacy__",
- items,
- lastServedTimestamp: 0,
- };
- }
+ const paused = (await ctx.get("paused")) ?? false;
return {
groups,
+ paused,
inFlight: (await ctx.get("inFlight")) ?? 0,
};
}
@@ -176,7 +216,7 @@ function idempotencyKeyAlreadyExists(
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
ctx.set("inFlight", state.inFlight);
- ctx.clear("items");
+ ctx.set("paused", state.paused);
}
export class RestateSemaphore {