aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/runner.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-10 15:31:30 +0000
committerGitHub <noreply@github.com>2026-01-10 15:31:30 +0000
commitf48e98e16ae588ee5004531bf9a5aed757ed3786 (patch)
treefc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/runner.ts
parentaace8864d7eab5c858a92064b0ac59c122377830 (diff)
downloadkarakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/runner.ts')
-rw-r--r--packages/plugins/queue-restate/src/runner.ts164
1 files changed, 164 insertions, 0 deletions
diff --git a/packages/plugins/queue-restate/src/runner.ts b/packages/plugins/queue-restate/src/runner.ts
new file mode 100644
index 00000000..2bda9f72
--- /dev/null
+++ b/packages/plugins/queue-restate/src/runner.ts
@@ -0,0 +1,164 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type { RunnerFuncs, RunnerOptions } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import type { RunnerJobData, RunnerResult, SerializedError } from "./types";
+
+function serializeError(error: Error): SerializedError {
+ return {
+ name: error.name,
+ message: error.message,
+ stack: error.stack,
+ };
+}
+
+export function runnerServiceName(queueName: string): string {
+ return `${queueName}-runner`;
+}
+
+export function buildRunnerService<T, R>(
+ queueName: string,
+ funcs: RunnerFuncs<T, R>,
+ opts: RunnerOptions<T>,
+) {
+ return restate.service({
+ name: runnerServiceName(queueName),
+ options: {
+ ingressPrivate: true,
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs * 2,
+ },
+ // No retries at runner level - dispatcher handles retry logic
+ retryPolicy: {
+ maxAttempts: 1,
+ },
+ journalRetention: {
+ days: 3,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ jobData: RunnerJobData<T>,
+ ): Promise<RunnerResult<R>> => {
+ // Validate payload if validator provided
+ let payload = jobData.data;
+ if (opts.validator) {
+ const res = opts.validator.safeParse(jobData.data);
+ if (!res.success) {
+ return {
+ type: "error",
+ error: {
+ name: "ValidationError",
+ message: res.error.message,
+ },
+ };
+ }
+ payload = res.data;
+ }
+
+ const res = await tryCatch(
+ ctx
+ .run(
+ "main logic",
+ async () => {
+ const result = await tryCatch(
+ funcs.run({
+ id: jobData.id,
+ data: payload,
+ priority: jobData.priority,
+ runNumber: jobData.runNumber,
+ abortSignal: AbortSignal.timeout(
+ jobData.timeoutSecs * 1000,
+ ),
+ }),
+ );
+ if (result.error) {
+ if (result.error instanceof QueueRetryAfterError) {
+ return {
+ type: "rate_limit" as const,
+ delayMs: result.error.delayMs,
+ };
+ }
+ throw result.error;
+ }
+ return { type: "success" as const, value: result.data };
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ )
+ .orTimeout({
+ seconds: jobData.timeoutSecs * 1.1,
+ }),
+ );
+
+ if (res.error) {
+ return {
+ type: "error",
+ error: serializeError(res.error),
+ };
+ }
+
+ return res.data as RunnerResult<R>;
+ },
+
+ onCompleted: async (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; result: R },
+ ): Promise<void> => {
+ await ctx.run(
+ "onComplete",
+ async () => {
+ await funcs.onComplete?.(
+ {
+ id: data.job.id,
+ data: data.job.data,
+ priority: data.job.priority,
+ runNumber: data.job.runNumber,
+ abortSignal: AbortSignal.timeout(data.job.timeoutSecs * 1000),
+ },
+ data.result,
+ );
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ },
+
+ onError: async (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; error: SerializedError },
+ ): Promise<void> => {
+ // Reconstruct the error
+ const reconstructedError = Object.assign(
+ new Error(data.error.message),
+ {
+ name: data.error.name,
+ stack: data.error.stack,
+ },
+ );
+
+ await ctx.run(
+ "onError",
+ async () => {
+ await funcs.onError?.({
+ id: data.job.id,
+ data: data.job.data,
+ priority: data.job.priority,
+ runNumber: data.job.runNumber,
+ numRetriesLeft: data.job.numRetriesLeft,
+ error: reconstructedError,
+ });
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ },
+ },
+ });
+}