aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/semaphore.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-10 15:31:30 +0000
committerGitHub <noreply@github.com>2026-01-10 15:31:30 +0000
commitf48e98e16ae588ee5004531bf9a5aed757ed3786 (patch)
treefc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/semaphore.ts
parentaace8864d7eab5c858a92064b0ac59c122377830 (diff)
downloadkarakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/semaphore.ts')
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts71
1 files changed, 48 insertions, 23 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 451e7251..72fa938a 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -7,18 +7,20 @@ interface QueueItem {
awakeable: string;
idempotencyKey?: string;
priority: number;
+ leaseDurationMs: number;
}
interface LegacyQueueState {
itemsv2: Record<string, GroupState>;
inFlight: number;
paused: boolean;
+ leases: Record<string, number>;
}
interface QueueState {
groups: Record<string, GroupState>;
- inFlight: number;
paused: boolean;
+ leases: Record<string, number>;
}
interface GroupState {
@@ -40,6 +42,7 @@ export const semaphore = object({
awakeableId: string;
priority: number;
capacity: number;
+ leaseDurationMs: number;
groupId?: string;
idempotencyKey?: string;
},
@@ -59,7 +62,7 @@ export const semaphore = object({
state.groups[req.groupId] = {
id: req.groupId,
items: [],
- lastServedTimestamp: Date.now(),
+ lastServedTimestamp: await ctx.date.now(),
};
}
@@ -67,9 +70,10 @@ export const semaphore = object({
awakeable: req.awakeableId,
priority: req.priority,
idempotencyKey: req.idempotencyKey,
+ leaseDurationMs: req.leaseDurationMs,
});
- tick(ctx, state, req.capacity);
+ await tick(ctx, state, req.capacity);
setState(ctx, state);
return true;
@@ -82,11 +86,14 @@ export const semaphore = object({
},
async (
ctx: ObjectContext<LegacyQueueState>,
- capacity: number,
+ req: {
+ leaseId: string;
+ capacity: number;
+ },
): Promise<void> => {
const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
+ delete state.leases[req.leaseId];
+ await tick(ctx, state, req.capacity);
setState(ctx, state);
},
),
@@ -103,7 +110,7 @@ export const semaphore = object({
async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
const state = await getState(ctx);
state.paused = false;
- tick(ctx, state, 1);
+ await tick(ctx, state, 1);
setState(ctx, state);
},
),
@@ -111,7 +118,7 @@ export const semaphore = object({
{},
async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
const state = await getState(ctx);
- state.inFlight = 0;
+ state.leases = {};
setState(ctx, state);
},
),
@@ -119,7 +126,7 @@ export const semaphore = object({
{},
async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
const state = await getState(ctx);
- tick(ctx, state, 1);
+ await tick(ctx, state, 1);
setState(ctx, state);
},
),
@@ -130,7 +137,10 @@ export const semaphore = object({
});
// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(state: QueueState): {
+function selectAndPopItem(
+ state: QueueState,
+ now: number,
+): {
item: QueueItem;
groupId: string;
} {
@@ -165,25 +175,37 @@ function selectAndPopItem(state: QueueState): {
}
const [item] = state.groups[selected.groupId].items.splice(selected.index, 1);
- state.groups[selected.groupId].lastServedTimestamp = Date.now();
+ state.groups[selected.groupId].lastServedTimestamp = now;
if (state.groups[selected.groupId].items.length === 0) {
delete state.groups[selected.groupId];
}
return { item, groupId: selected.groupId };
}
-function tick(
+function pruneExpiredLeases(state: QueueState, now: number) {
+ for (const [leaseId, expiry] of Object.entries(state.leases)) {
+ if (expiry <= now) {
+ delete state.leases[leaseId];
+ }
+ }
+ return Object.keys(state.leases).length;
+}
+
+async function tick(
ctx: ObjectContext<LegacyQueueState>,
state: QueueState,
capacity: number,
-) {
+): Promise<void> {
+ let activeLeases = pruneExpiredLeases(state, await ctx.date.now());
while (
!state.paused &&
- state.inFlight < capacity &&
+ activeLeases < capacity &&
Object.keys(state.groups).length > 0
) {
- const { item } = selectAndPopItem(state);
- state.inFlight++;
+ const now = await ctx.date.now();
+ const { item } = selectAndPopItem(state, now);
+ state.leases[item.awakeable] = now + item.leaseDurationMs;
+ activeLeases++;
ctx.resolveAwakeable(item.awakeable);
}
}
@@ -193,11 +215,12 @@ async function getState(
): Promise<QueueState> {
const groups = (await ctx.get("itemsv2")) ?? {};
const paused = (await ctx.get("paused")) ?? false;
+ const leases = (await ctx.get("leases")) ?? {};
return {
groups,
paused,
- inFlight: (await ctx.get("inFlight")) ?? 0,
+ leases,
};
}
@@ -215,7 +238,7 @@ function idempotencyKeyAlreadyExists(
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
- ctx.set("inFlight", state.inFlight);
+ ctx.set("leases", state.leases);
ctx.set("paused", state.paused);
}
@@ -224,6 +247,7 @@ export class RestateSemaphore {
private readonly ctx: Context,
private readonly id: string,
private readonly capacity: number,
+ private readonly leaseDurationMs: number,
) {}
async acquire(priority: number, groupId?: string, idempotencyKey?: string) {
@@ -234,6 +258,7 @@ export class RestateSemaphore {
awakeableId: awk.id,
priority,
capacity: this.capacity,
+ leaseDurationMs: this.leaseDurationMs,
groupId,
idempotencyKey,
});
@@ -246,15 +271,15 @@ export class RestateSemaphore {
await awk.promise;
} catch (e) {
if (e instanceof restate.CancelledError) {
- await this.release();
- throw e;
+ await this.release(awk.id);
}
+ throw e;
}
- return true;
+ return awk.id;
}
- async release() {
+ async release(leaseId: string) {
await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .release(this.capacity);
+ .release({ leaseId, capacity: this.capacity });
}
}