From f48e98e16ae588ee5004531bf9a5aed757ed3786 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 10 Jan 2026 15:31:30 +0000 Subject: fix: harden the restate implementation (#2370) * fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure --- packages/plugins/queue-restate/src/dispatcher.ts | 189 +++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 packages/plugins/queue-restate/src/dispatcher.ts (limited to 'packages/plugins/queue-restate/src/dispatcher.ts') diff --git a/packages/plugins/queue-restate/src/dispatcher.ts b/packages/plugins/queue-restate/src/dispatcher.ts new file mode 100644 index 00000000..6b48a571 --- /dev/null +++ b/packages/plugins/queue-restate/src/dispatcher.ts @@ -0,0 +1,189 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { + Queue, + QueueOptions, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import type { RunnerJobData, RunnerResult, SerializedError } from "./types"; +import { runnerServiceName } from "./runner"; +import { RestateSemaphore } from "./semaphore"; + +export function buildDispatcherService( + queue: Queue, + opts: RunnerOptions, + queueOpts: QueueOptions, +) { + const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; + const runnerName = runnerServiceName(queue.name()); + + // Type definition for the runner service client + // Note: ctx parameter is required for Restate SDK to correctly infer client method signatures + interface RunnerService { + run: ( + ctx: restate.Context, + data: RunnerJobData, + ) => Promise>; + onCompleted: ( + ctx: restate.Context, + data: { job: RunnerJobData; result: R }, + ) => Promise; + onError: ( + ctx: restate.Context, + data: { job: RunnerJobData; error: SerializedError }, + ) => Promise; + } + + return restate.service({ + name: queue.name(), + options: { + inactivityTimeout: { + seconds: opts.timeoutSecs * 2, + }, + retryPolicy: { + maxAttempts: NUM_RETRIES, + initialInterval: { + seconds: 5, + }, + maxInterval: { + minutes: 1, + }, + }, + journalRetention: { + days: 3, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + data: { + payload: T; + queuedIdempotencyKey?: string; + priority: number; + groupId?: string; + }, + ) => { + const id = ctx.rand.uuidv4(); + const priority = data.priority ?? 0; + const logDebug = async (message: string) => { + await ctx.run( + "log", + async () => { + logger.debug(message); + }, + { + maxRetryAttempts: 1, + }, + ); + }; + + const semaphore = new RestateSemaphore( + ctx, + `queue:${queue.name()}`, + opts.concurrency, + Math.ceil(opts.timeoutSecs * 1.5 * 1000), + ); + + const runner = ctx.serviceClient({ name: runnerName }); + + let runNumber = 0; + while (runNumber <= NUM_RETRIES) { + await logDebug( + `Dispatcher attempt ${runNumber} for queue ${queue.name()} job ${id} (priority=${priority}, groupId=${data.groupId ?? "none"})`, + ); + const leaseId = await semaphore.acquire( + priority, + data.groupId, + data.queuedIdempotencyKey, + ); + if (!leaseId) { + // Idempotency key already exists, skip + await logDebug( + `Dispatcher skipping queue ${queue.name()} job ${id} due to existing idempotency key`, + ); + return; + } + await logDebug( + `Dispatcher acquired lease ${leaseId} for queue ${queue.name()} job ${id}`, + ); + + const jobData: RunnerJobData = { + id, + data: data.payload, + priority, + runNumber, + numRetriesLeft: NUM_RETRIES - runNumber, + timeoutSecs: opts.timeoutSecs, + }; + + // Call the runner service + const res = await tryCatch(runner.run(jobData)); + + // Handle RPC-level errors (e.g., runner service unavailable) + if (res.error) { + await logDebug( + `Dispatcher RPC error for queue ${queue.name()} job ${id}: ${res.error instanceof Error ? res.error.message : String(res.error)}`, + ); + await semaphore.release(leaseId); + if (res.error instanceof restate.CancelledError) { + throw res.error; + } + // Retry with backoff + await ctx.sleep(1000, "rpc error retry"); + runNumber++; + continue; + } + + const result = res.data; + + if (result.type === "rate_limit") { + // Rate limit - release semaphore, sleep, and retry without incrementing runNumber + await logDebug( + `Dispatcher rate limit for queue ${queue.name()} job ${id} (delayMs=${result.delayMs})`, + ); + await semaphore.release(leaseId); + await ctx.sleep(result.delayMs, "rate limit retry"); + continue; + } + + if (result.type === "error") { + // Call onError on the runner BEFORE releasing semaphore + // This ensures inFlight tracking stays consistent + await logDebug( + `Dispatcher runner error for queue ${queue.name()} job ${id}: ${result.error.message}`, + ); + await tryCatch( + runner.onError({ + job: jobData, + error: result.error, + }), + ); + await semaphore.release(leaseId); + + // Retry with backoff + await ctx.sleep(1000, "error retry"); + runNumber++; + continue; + } + + // Success - call onCompleted BEFORE releasing semaphore + // This ensures inFlight tracking stays consistent + await logDebug( + `Dispatcher completed queue ${queue.name()} job ${id}`, + ); + await tryCatch( + runner.onCompleted({ + job: jobData, + result: result.value, + }), + ); + await semaphore.release(leaseId); + break; + } + }, + }, + }); +} -- cgit v1.2.3-70-g09d2