From f48e98e16ae588ee5004531bf9a5aed757ed3786 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 10 Jan 2026 15:31:30 +0000 Subject: fix: harden the restate implementation (#2370) * fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure --- packages/plugins/queue-restate/src/runner.ts | 164 +++++++++++++++++++++++++++ 1 file changed, 164 insertions(+) create mode 100644 packages/plugins/queue-restate/src/runner.ts (limited to 'packages/plugins/queue-restate/src/runner.ts') diff --git a/packages/plugins/queue-restate/src/runner.ts b/packages/plugins/queue-restate/src/runner.ts new file mode 100644 index 00000000..2bda9f72 --- /dev/null +++ b/packages/plugins/queue-restate/src/runner.ts @@ -0,0 +1,164 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { RunnerFuncs, RunnerOptions } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import type { RunnerJobData, RunnerResult, SerializedError } from "./types"; + +function serializeError(error: Error): SerializedError { + return { + name: error.name, + message: error.message, + stack: error.stack, + }; +} + +export function runnerServiceName(queueName: string): string { + return `${queueName}-runner`; +} + +export function buildRunnerService( + queueName: string, + funcs: RunnerFuncs, + opts: RunnerOptions, +) { + return restate.service({ + name: runnerServiceName(queueName), + options: { + ingressPrivate: true, + inactivityTimeout: { + seconds: opts.timeoutSecs * 2, + }, + // No retries at runner level - dispatcher handles retry logic + retryPolicy: { + maxAttempts: 1, + }, + journalRetention: { + days: 3, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + jobData: RunnerJobData, + ): Promise> => { + // Validate payload if validator provided + let payload = jobData.data; + if (opts.validator) { + const res = opts.validator.safeParse(jobData.data); + if (!res.success) { + return { + type: "error", + error: { + name: "ValidationError", + message: res.error.message, + }, + }; + } + payload = res.data; + } + + const res = await tryCatch( + ctx + .run( + "main logic", + async () => { + const result = await tryCatch( + funcs.run({ + id: jobData.id, + data: payload, + priority: jobData.priority, + runNumber: jobData.runNumber, + abortSignal: AbortSignal.timeout( + jobData.timeoutSecs * 1000, + ), + }), + ); + if (result.error) { + if (result.error instanceof QueueRetryAfterError) { + return { + type: "rate_limit" as const, + delayMs: result.error.delayMs, + }; + } + throw result.error; + } + return { type: "success" as const, value: result.data }; + }, + { + maxRetryAttempts: 1, + }, + ) + .orTimeout({ + seconds: jobData.timeoutSecs * 1.1, + }), + ); + + if (res.error) { + return { + type: "error", + error: serializeError(res.error), + }; + } + + return res.data as RunnerResult; + }, + + onCompleted: async ( + ctx: restate.Context, + data: { job: RunnerJobData; result: R }, + ): Promise => { + await ctx.run( + "onComplete", + async () => { + await funcs.onComplete?.( + { + id: data.job.id, + data: data.job.data, + priority: data.job.priority, + runNumber: data.job.runNumber, + abortSignal: AbortSignal.timeout(data.job.timeoutSecs * 1000), + }, + data.result, + ); + }, + { + maxRetryAttempts: 1, + }, + ); + }, + + onError: async ( + ctx: restate.Context, + data: { job: RunnerJobData; error: SerializedError }, + ): Promise => { + // Reconstruct the error + const reconstructedError = Object.assign( + new Error(data.error.message), + { + name: data.error.name, + stack: data.error.stack, + }, + ); + + await ctx.run( + "onError", + async () => { + await funcs.onError?.({ + id: data.job.id, + data: data.job.data, + priority: data.job.priority, + runNumber: data.job.runNumber, + numRetriesLeft: data.job.numRetriesLeft, + error: reconstructedError, + }); + }, + { + maxRetryAttempts: 1, + }, + ); + }, + }, + }); +} -- cgit v1.2.3-70-g09d2