aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/semaphore.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-30 00:19:28 +0000
committerGitHub <noreply@github.com>2025-11-30 00:19:28 +0000
commita71b9505ea76596659c98eb6180a09a895399741 (patch)
treeceabe80e36d36f594e92c7b3823147abe6e6b474 /packages/plugins/queue-restate/src/semaphore.ts
parentb12c1c3a82941f2767ade8f497db56933415b94d (diff)
downloadkarakeep-a71b9505ea76596659c98eb6180a09a895399741.tar.zst
fix: Add restate queued idempotency (#2169)
* fix: Add restate queued idempotency * return on failed to acquire
Diffstat (limited to 'packages/plugins/queue-restate/src/semaphore.ts')
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts36
1 files changed, 33 insertions, 3 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 9acecf28..694c77b3 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -5,6 +5,7 @@ import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
interface QueueItem {
awakeable: string;
+ idempotencyKey?: string;
priority: number;
}
@@ -35,9 +36,18 @@ export const semaphore = object({
priority: number;
capacity: number;
groupId?: string;
+ idempotencyKey?: string;
},
- ): Promise<void> => {
+ ): Promise<boolean> => {
const state = await getState(ctx);
+
+ if (
+ req.idempotencyKey &&
+ idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
+ ) {
+ return false;
+ }
+
req.groupId = req.groupId ?? "__ungrouped__";
if (state.groups[req.groupId] === undefined) {
@@ -51,11 +61,13 @@ export const semaphore = object({
state.groups[req.groupId].items.push({
awakeable: req.awakeableId,
priority: req.priority,
+ idempotencyKey: req.idempotencyKey,
});
tick(ctx, state, req.capacity);
setState(ctx, state);
+ return true;
},
release: async (
@@ -149,6 +161,18 @@ async function getState(
};
}
+function idempotencyKeyAlreadyExists(
+ items: Record<string, GroupState>,
+ key: string,
+) {
+ for (const group of Object.values(items)) {
+ if (group.items.some((item) => item.idempotencyKey === key)) {
+ return true;
+ }
+ }
+ return false;
+}
+
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
ctx.set("inFlight", state.inFlight);
@@ -162,17 +186,22 @@ export class RestateSemaphore {
private readonly capacity: number,
) {}
- async acquire(priority: number, groupId?: string) {
+ async acquire(priority: number, groupId?: string, idempotencyKey?: string) {
const awk = this.ctx.awakeable();
- await this.ctx
+ const res = await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
.acquire({
awakeableId: awk.id,
priority,
capacity: this.capacity,
groupId,
+ idempotencyKey,
});
+ if (!res) {
+ return false;
+ }
+
try {
await awk.promise;
} catch (e) {
@@ -181,6 +210,7 @@ export class RestateSemaphore {
throw e;
}
}
+ return true;
}
async release() {
await this.ctx