import * as restate from "@restatedev/restate-sdk"; import type { RunnerFuncs, RunnerOptions } from "@karakeep/shared/queueing"; import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import type { RunnerJobData, RunnerResult, SerializedError } from "./types"; function serializeError(error: Error): SerializedError { return { name: error.name, message: error.message, stack: error.stack, }; } export function runnerServiceName(queueName: string): string { return `${queueName}-runner`; } export function buildRunnerService( queueName: string, funcs: RunnerFuncs, opts: RunnerOptions, ) { return restate.service({ name: runnerServiceName(queueName), options: { ingressPrivate: true, inactivityTimeout: { seconds: opts.timeoutSecs * 2, }, // No retries at runner level - dispatcher handles retry logic retryPolicy: { maxAttempts: 1, }, journalRetention: { days: 3, }, }, handlers: { run: async ( ctx: restate.Context, jobData: RunnerJobData, ): Promise> => { // Validate payload if validator provided let payload = jobData.data; if (opts.validator) { const res = opts.validator.safeParse(jobData.data); if (!res.success) { return { type: "error", error: { name: "ValidationError", message: res.error.message, }, }; } payload = res.data; } const res = await tryCatch( ctx .run( "main logic", async () => { const result = await tryCatch( funcs.run({ id: jobData.id, data: payload, priority: jobData.priority, runNumber: jobData.runNumber, abortSignal: AbortSignal.timeout( jobData.timeoutSecs * 1000, ), }), ); if (result.error) { if (result.error instanceof QueueRetryAfterError) { return { type: "rate_limit" as const, delayMs: result.error.delayMs, }; } throw result.error; } return { type: "success" as const, value: result.data }; }, { maxRetryAttempts: 1, }, ) .orTimeout({ seconds: jobData.timeoutSecs * 1.1, }), ); if (res.error) { return { type: "error", error: serializeError(res.error), }; } return res.data as RunnerResult; }, onCompleted: async ( ctx: restate.Context, data: { job: RunnerJobData; result: R }, ): Promise => { await ctx.run( "onComplete", async () => { await funcs.onComplete?.( { id: data.job.id, data: data.job.data, priority: data.job.priority, runNumber: data.job.runNumber, abortSignal: AbortSignal.timeout(data.job.timeoutSecs * 1000), }, data.result, ); }, { maxRetryAttempts: 1, }, ); }, onError: async ( ctx: restate.Context, data: { job: RunnerJobData; error: SerializedError }, ): Promise => { // Reconstruct the error const reconstructedError = Object.assign( new Error(data.error.message), { name: data.error.name, stack: data.error.stack, }, ); await ctx.run( "onError", async () => { await funcs.onError?.({ id: data.job.id, data: data.job.data, priority: data.job.priority, runNumber: data.job.runNumber, numRetriesLeft: data.job.numRetriesLeft, error: reconstructedError, }); }, { maxRetryAttempts: 1, }, ); }, }, }); }