aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-30 00:19:28 +0000
committerGitHub <noreply@github.com>2025-11-30 00:19:28 +0000
commita71b9505ea76596659c98eb6180a09a895399741 (patch)
treeceabe80e36d36f594e92c7b3823147abe6e6b474 /packages/plugins/queue-restate
parentb12c1c3a82941f2767ade8f497db56933415b94d (diff)
downloadkarakeep-a71b9505ea76596659c98eb6180a09a895399741.tar.zst
fix: Add restate queued idempotency (#2169)
* fix: Add restate queued idempotency * return on failed to acquire
Diffstat (limited to '')
-rw-r--r--packages/plugins/queue-restate/src/index.ts3
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts36
-rw-r--r--packages/plugins/queue-restate/src/service.ts10
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts21
4 files changed, 65 insertions, 5 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index de37bbea..fa636f87 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -42,6 +42,7 @@ class RestateQueueWrapper<T> implements Queue<T> {
payload: T;
priority: number;
groupId?: string;
+ queuedIdempotencyKey?: string;
},
) => Promise<void>;
}
@@ -51,6 +52,7 @@ class RestateQueueWrapper<T> implements Queue<T> {
payload,
priority: options?.priority ?? 0,
groupId: options?.groupId,
+ queuedIdempotencyKey: options?.idempotencyKey,
},
restateClient.rpc.sendOpts({
delay: options?.delayMs
@@ -58,7 +60,6 @@ class RestateQueueWrapper<T> implements Queue<T> {
milliseconds: options.delayMs,
}
: undefined,
- idempotencyKey: options?.idempotencyKey,
}),
);
return res.invocationId;
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 9acecf28..694c77b3 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -5,6 +5,7 @@ import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
interface QueueItem {
awakeable: string;
+ idempotencyKey?: string;
priority: number;
}
@@ -35,9 +36,18 @@ export const semaphore = object({
priority: number;
capacity: number;
groupId?: string;
+ idempotencyKey?: string;
},
- ): Promise<void> => {
+ ): Promise<boolean> => {
const state = await getState(ctx);
+
+ if (
+ req.idempotencyKey &&
+ idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
+ ) {
+ return false;
+ }
+
req.groupId = req.groupId ?? "__ungrouped__";
if (state.groups[req.groupId] === undefined) {
@@ -51,11 +61,13 @@ export const semaphore = object({
state.groups[req.groupId].items.push({
awakeable: req.awakeableId,
priority: req.priority,
+ idempotencyKey: req.idempotencyKey,
});
tick(ctx, state, req.capacity);
setState(ctx, state);
+ return true;
},
release: async (
@@ -149,6 +161,18 @@ async function getState(
};
}
+function idempotencyKeyAlreadyExists(
+ items: Record<string, GroupState>,
+ key: string,
+) {
+ for (const group of Object.values(items)) {
+ if (group.items.some((item) => item.idempotencyKey === key)) {
+ return true;
+ }
+ }
+ return false;
+}
+
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
ctx.set("inFlight", state.inFlight);
@@ -162,17 +186,22 @@ export class RestateSemaphore {
private readonly capacity: number,
) {}
- async acquire(priority: number, groupId?: string) {
+ async acquire(priority: number, groupId?: string, idempotencyKey?: string) {
const awk = this.ctx.awakeable();
- await this.ctx
+ const res = await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
.acquire({
awakeableId: awk.id,
priority,
capacity: this.capacity,
groupId,
+ idempotencyKey,
});
+ if (!res) {
+ return false;
+ }
+
try {
await awk.promise;
} catch (e) {
@@ -181,6 +210,7 @@ export class RestateSemaphore {
throw e;
}
}
+ return true;
}
async release() {
await this.ctx
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index b26f66cf..5ba7d1df 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -39,6 +39,7 @@ export function buildRestateService<T, R>(
ctx: restate.Context,
data: {
payload: T;
+ queuedIdempotencyKey?: string;
priority: number;
groupId?: string;
},
@@ -65,7 +66,14 @@ export function buildRestateService<T, R>(
let lastError: Error | undefined;
for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- await semaphore.acquire(priority, data.groupId);
+ const acquired = await semaphore.acquire(
+ priority,
+ data.groupId,
+ data.queuedIdempotencyKey,
+ );
+ if (!acquired) {
+ return;
+ }
const res = await runWorkerLogic(ctx, funcs, {
id,
data: payload,
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 28e32394..2085d57b 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -209,9 +209,30 @@ describe("Restate Queue Provider", () => {
it("should use idempotency key", async () => {
const idempotencyKey = `test-${Date.now()}`;
+ // hog the queue
+ await Promise.all([
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ queue.enqueue(
+ { type: "semaphore-acquire" },
+ { groupId: "init", priority: -10 },
+ ),
+ ]);
+ await testState.baton.waitUntilCountWaiting(3);
+
await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+
+ testState.baton.release();
+
await waitUntilQueueEmpty();
expect(testState.results).toEqual([200]);