diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-01-10 15:31:30 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-10 15:31:30 +0000 |
| commit | f48e98e16ae588ee5004531bf9a5aed757ed3786 (patch) | |
| tree | fc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/semaphore.ts | |
| parent | aace8864d7eab5c858a92064b0ac59c122377830 (diff) | |
| download | karakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst | |
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes
* fix: guard meilisearch client init with mutex
* fix: fix propagation of last error in restate
* fix: don't fail invocations when the job fails
* fix: add a timeout around the worker runner logic
* fix: add leases to handle dangling semaphores
* feat: separate dispatchers and runners
* add a test
* fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/semaphore.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 71 |
1 files changed, 48 insertions, 23 deletions
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 451e7251..72fa938a 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -7,18 +7,20 @@ interface QueueItem { awakeable: string; idempotencyKey?: string; priority: number; + leaseDurationMs: number; } interface LegacyQueueState { itemsv2: Record<string, GroupState>; inFlight: number; paused: boolean; + leases: Record<string, number>; } interface QueueState { groups: Record<string, GroupState>; - inFlight: number; paused: boolean; + leases: Record<string, number>; } interface GroupState { @@ -40,6 +42,7 @@ export const semaphore = object({ awakeableId: string; priority: number; capacity: number; + leaseDurationMs: number; groupId?: string; idempotencyKey?: string; }, @@ -59,7 +62,7 @@ export const semaphore = object({ state.groups[req.groupId] = { id: req.groupId, items: [], - lastServedTimestamp: Date.now(), + lastServedTimestamp: await ctx.date.now(), }; } @@ -67,9 +70,10 @@ export const semaphore = object({ awakeable: req.awakeableId, priority: req.priority, idempotencyKey: req.idempotencyKey, + leaseDurationMs: req.leaseDurationMs, }); - tick(ctx, state, req.capacity); + await tick(ctx, state, req.capacity); setState(ctx, state); return true; @@ -82,11 +86,14 @@ export const semaphore = object({ }, async ( ctx: ObjectContext<LegacyQueueState>, - capacity: number, + req: { + leaseId: string; + capacity: number; + }, ): Promise<void> => { const state = await getState(ctx); - state.inFlight--; - tick(ctx, state, capacity); + delete state.leases[req.leaseId]; + await tick(ctx, state, req.capacity); setState(ctx, state); }, ), @@ -103,7 +110,7 @@ export const semaphore = object({ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { const state = await getState(ctx); state.paused = false; - tick(ctx, state, 1); + await tick(ctx, state, 1); setState(ctx, state); }, ), @@ -111,7 +118,7 @@ export const semaphore = object({ {}, async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { const state = await getState(ctx); - state.inFlight = 0; + state.leases = {}; setState(ctx, state); }, ), @@ -119,7 +126,7 @@ export const semaphore = object({ {}, async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { const state = await getState(ctx); - tick(ctx, state, 1); + await tick(ctx, state, 1); setState(ctx, state); }, ), @@ -130,7 +137,10 @@ export const semaphore = object({ }); // Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(state: QueueState): { +function selectAndPopItem( + state: QueueState, + now: number, +): { item: QueueItem; groupId: string; } { @@ -165,25 +175,37 @@ function selectAndPopItem(state: QueueState): { } const [item] = state.groups[selected.groupId].items.splice(selected.index, 1); - state.groups[selected.groupId].lastServedTimestamp = Date.now(); + state.groups[selected.groupId].lastServedTimestamp = now; if (state.groups[selected.groupId].items.length === 0) { delete state.groups[selected.groupId]; } return { item, groupId: selected.groupId }; } -function tick( +function pruneExpiredLeases(state: QueueState, now: number) { + for (const [leaseId, expiry] of Object.entries(state.leases)) { + if (expiry <= now) { + delete state.leases[leaseId]; + } + } + return Object.keys(state.leases).length; +} + +async function tick( ctx: ObjectContext<LegacyQueueState>, state: QueueState, capacity: number, -) { +): Promise<void> { + let activeLeases = pruneExpiredLeases(state, await ctx.date.now()); while ( !state.paused && - state.inFlight < capacity && + activeLeases < capacity && Object.keys(state.groups).length > 0 ) { - const { item } = selectAndPopItem(state); - state.inFlight++; + const now = await ctx.date.now(); + const { item } = selectAndPopItem(state, now); + state.leases[item.awakeable] = now + item.leaseDurationMs; + activeLeases++; ctx.resolveAwakeable(item.awakeable); } } @@ -193,11 +215,12 @@ async function getState( ): Promise<QueueState> { const groups = (await ctx.get("itemsv2")) ?? {}; const paused = (await ctx.get("paused")) ?? false; + const leases = (await ctx.get("leases")) ?? {}; return { groups, paused, - inFlight: (await ctx.get("inFlight")) ?? 0, + leases, }; } @@ -215,7 +238,7 @@ function idempotencyKeyAlreadyExists( function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) { ctx.set("itemsv2", state.groups); - ctx.set("inFlight", state.inFlight); + ctx.set("leases", state.leases); ctx.set("paused", state.paused); } @@ -224,6 +247,7 @@ export class RestateSemaphore { private readonly ctx: Context, private readonly id: string, private readonly capacity: number, + private readonly leaseDurationMs: number, ) {} async acquire(priority: number, groupId?: string, idempotencyKey?: string) { @@ -234,6 +258,7 @@ export class RestateSemaphore { awakeableId: awk.id, priority, capacity: this.capacity, + leaseDurationMs: this.leaseDurationMs, groupId, idempotencyKey, }); @@ -246,15 +271,15 @@ export class RestateSemaphore { await awk.promise; } catch (e) { if (e instanceof restate.CancelledError) { - await this.release(); - throw e; + await this.release(awk.id); } + throw e; } - return true; + return awk.id; } - async release() { + async release(leaseId: string) { await this.ctx .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .release(this.capacity); + .release({ leaseId, capacity: this.capacity }); } } |
