aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-01-10 15:31:30 +0000
committerGitHub <noreply@github.com>2026-01-10 15:31:30 +0000
commitf48e98e16ae588ee5004531bf9a5aed757ed3786 (patch)
treefc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src
parentaace8864d7eab5c858a92064b0ac59c122377830 (diff)
downloadkarakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src')
-rw-r--r--packages/plugins/queue-restate/src/dispatcher.ts189
-rw-r--r--packages/plugins/queue-restate/src/env.ts3
-rw-r--r--packages/plugins/queue-restate/src/index.ts39
-rw-r--r--packages/plugins/queue-restate/src/runner.ts164
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts71
-rw-r--r--packages/plugins/queue-restate/src/service.ts190
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts88
-rw-r--r--packages/plugins/queue-restate/src/types.ts78
8 files changed, 611 insertions, 211 deletions
diff --git a/packages/plugins/queue-restate/src/dispatcher.ts b/packages/plugins/queue-restate/src/dispatcher.ts
new file mode 100644
index 00000000..6b48a571
--- /dev/null
+++ b/packages/plugins/queue-restate/src/dispatcher.ts
@@ -0,0 +1,189 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type {
+ Queue,
+ QueueOptions,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import logger from "@karakeep/shared/logger";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import type { RunnerJobData, RunnerResult, SerializedError } from "./types";
+import { runnerServiceName } from "./runner";
+import { RestateSemaphore } from "./semaphore";
+
+export function buildDispatcherService<T, R>(
+ queue: Queue<T>,
+ opts: RunnerOptions<T>,
+ queueOpts: QueueOptions,
+) {
+ const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
+ const runnerName = runnerServiceName(queue.name());
+
+ // Type definition for the runner service client
+ // Note: ctx parameter is required for Restate SDK to correctly infer client method signatures
+ interface RunnerService {
+ run: (
+ ctx: restate.Context,
+ data: RunnerJobData<T>,
+ ) => Promise<RunnerResult<R>>;
+ onCompleted: (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; result: R },
+ ) => Promise<void>;
+ onError: (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; error: SerializedError },
+ ) => Promise<void>;
+ }
+
+ return restate.service({
+ name: queue.name(),
+ options: {
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs * 2,
+ },
+ retryPolicy: {
+ maxAttempts: NUM_RETRIES,
+ initialInterval: {
+ seconds: 5,
+ },
+ maxInterval: {
+ minutes: 1,
+ },
+ },
+ journalRetention: {
+ days: 3,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ queuedIdempotencyKey?: string;
+ priority: number;
+ groupId?: string;
+ },
+ ) => {
+ const id = ctx.rand.uuidv4();
+ const priority = data.priority ?? 0;
+ const logDebug = async (message: string) => {
+ await ctx.run(
+ "log",
+ async () => {
+ logger.debug(message);
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ };
+
+ const semaphore = new RestateSemaphore(
+ ctx,
+ `queue:${queue.name()}`,
+ opts.concurrency,
+ Math.ceil(opts.timeoutSecs * 1.5 * 1000),
+ );
+
+ const runner = ctx.serviceClient<RunnerService>({ name: runnerName });
+
+ let runNumber = 0;
+ while (runNumber <= NUM_RETRIES) {
+ await logDebug(
+ `Dispatcher attempt ${runNumber} for queue ${queue.name()} job ${id} (priority=${priority}, groupId=${data.groupId ?? "none"})`,
+ );
+ const leaseId = await semaphore.acquire(
+ priority,
+ data.groupId,
+ data.queuedIdempotencyKey,
+ );
+ if (!leaseId) {
+ // Idempotency key already exists, skip
+ await logDebug(
+ `Dispatcher skipping queue ${queue.name()} job ${id} due to existing idempotency key`,
+ );
+ return;
+ }
+ await logDebug(
+ `Dispatcher acquired lease ${leaseId} for queue ${queue.name()} job ${id}`,
+ );
+
+ const jobData: RunnerJobData<T> = {
+ id,
+ data: data.payload,
+ priority,
+ runNumber,
+ numRetriesLeft: NUM_RETRIES - runNumber,
+ timeoutSecs: opts.timeoutSecs,
+ };
+
+ // Call the runner service
+ const res = await tryCatch(runner.run(jobData));
+
+ // Handle RPC-level errors (e.g., runner service unavailable)
+ if (res.error) {
+ await logDebug(
+ `Dispatcher RPC error for queue ${queue.name()} job ${id}: ${res.error instanceof Error ? res.error.message : String(res.error)}`,
+ );
+ await semaphore.release(leaseId);
+ if (res.error instanceof restate.CancelledError) {
+ throw res.error;
+ }
+ // Retry with backoff
+ await ctx.sleep(1000, "rpc error retry");
+ runNumber++;
+ continue;
+ }
+
+ const result = res.data;
+
+ if (result.type === "rate_limit") {
+ // Rate limit - release semaphore, sleep, and retry without incrementing runNumber
+ await logDebug(
+ `Dispatcher rate limit for queue ${queue.name()} job ${id} (delayMs=${result.delayMs})`,
+ );
+ await semaphore.release(leaseId);
+ await ctx.sleep(result.delayMs, "rate limit retry");
+ continue;
+ }
+
+ if (result.type === "error") {
+ // Call onError on the runner BEFORE releasing semaphore
+ // This ensures inFlight tracking stays consistent
+ await logDebug(
+ `Dispatcher runner error for queue ${queue.name()} job ${id}: ${result.error.message}`,
+ );
+ await tryCatch(
+ runner.onError({
+ job: jobData,
+ error: result.error,
+ }),
+ );
+ await semaphore.release(leaseId);
+
+ // Retry with backoff
+ await ctx.sleep(1000, "error retry");
+ runNumber++;
+ continue;
+ }
+
+ // Success - call onCompleted BEFORE releasing semaphore
+ // This ensures inFlight tracking stays consistent
+ await logDebug(
+ `Dispatcher completed queue ${queue.name()} job ${id}`,
+ );
+ await tryCatch(
+ runner.onCompleted({
+ job: jobData,
+ result: result.value,
+ }),
+ );
+ await semaphore.release(leaseId);
+ break;
+ }
+ },
+ },
+ });
+}
diff --git a/packages/plugins/queue-restate/src/env.ts b/packages/plugins/queue-restate/src/env.ts
index abcfbe51..41003460 100644
--- a/packages/plugins/queue-restate/src/env.ts
+++ b/packages/plugins/queue-restate/src/env.ts
@@ -17,5 +17,8 @@ export const envConfig = z
RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"),
RESTATE_PUB_KEY: z.string().optional(),
RESTATE_EXPOSE_CORE_SERVICES: stringBool("true"),
+ // Deployment mode configuration - allows running dispatchers and runners separately
+ RESTATE_ENABLE_DISPATCHERS: stringBool("true"),
+ RESTATE_ENABLE_RUNNERS: stringBool("true"),
})
.parse(process.env);
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index 07082955..64572b03 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -17,7 +17,7 @@ import { AdminClient } from "./admin";
import { envConfig } from "./env";
import { idProvider } from "./idProvider";
import { semaphore } from "./semaphore";
-import { buildRestateService } from "./service";
+import { buildRestateServices } from "./service";
class RestateQueueWrapper<T> implements Queue<T> {
constructor(
@@ -87,12 +87,13 @@ class RestateQueueWrapper<T> implements Queue<T> {
class RestateRunnerWrapper<T> implements Runner<T> {
constructor(
- private readonly wf: restate.ServiceDefinition<
+ private readonly dispatcherDef: restate.ServiceDefinition<
string,
{
run: (ctx: restate.Context, data: T) => Promise<void>;
}
>,
+ private readonly runnerDef: restate.ServiceDefinition<string, unknown>,
) {}
async run(): Promise<void> {
@@ -107,8 +108,12 @@ class RestateRunnerWrapper<T> implements Runner<T> {
throw new Error("Method not implemented.");
}
- get def(): restate.WorkflowDefinition<string, unknown> {
- return this.wf;
+ get dispatcherService(): restate.ServiceDefinition<string, unknown> {
+ return this.dispatcherDef;
+ }
+
+ get runnerService(): restate.ServiceDefinition<string, unknown> {
+ return this.runnerDef;
}
}
@@ -130,14 +135,24 @@ class RestateQueueClient implements QueueClient {
}
async start(): Promise<void> {
+ const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = [];
+
+ for (const svc of this.services.values()) {
+ if (envConfig.RESTATE_ENABLE_DISPATCHERS) {
+ servicesToExpose.push(svc.dispatcherService);
+ }
+ if (envConfig.RESTATE_ENABLE_RUNNERS) {
+ servicesToExpose.push(svc.runnerService);
+ }
+ }
+
+ if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) {
+ servicesToExpose.push(semaphore, idProvider);
+ }
+
const port = await restate.serve({
port: envConfig.RESTATE_LISTEN_PORT ?? 0,
- services: [
- ...[...this.services.values()].map((svc) => svc.def),
- ...(envConfig.RESTATE_EXPOSE_CORE_SERVICES
- ? [semaphore, idProvider]
- : []),
- ],
+ services: servicesToExpose,
identityKeys: envConfig.RESTATE_PUB_KEY
? [envConfig.RESTATE_PUB_KEY]
: undefined,
@@ -176,8 +191,10 @@ class RestateQueueClient implements QueueClient {
if (wrapper) {
throw new Error(`Queue ${name} already exists`);
}
+ const services = buildRestateServices(queue, funcs, opts, queue.opts);
const svc = new RestateRunnerWrapper<T>(
- buildRestateService(queue, funcs, opts, queue.opts),
+ services.dispatcher,
+ services.runner,
);
this.services.set(name, svc);
return svc;
diff --git a/packages/plugins/queue-restate/src/runner.ts b/packages/plugins/queue-restate/src/runner.ts
new file mode 100644
index 00000000..2bda9f72
--- /dev/null
+++ b/packages/plugins/queue-restate/src/runner.ts
@@ -0,0 +1,164 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type { RunnerFuncs, RunnerOptions } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import type { RunnerJobData, RunnerResult, SerializedError } from "./types";
+
+function serializeError(error: Error): SerializedError {
+ return {
+ name: error.name,
+ message: error.message,
+ stack: error.stack,
+ };
+}
+
+export function runnerServiceName(queueName: string): string {
+ return `${queueName}-runner`;
+}
+
+export function buildRunnerService<T, R>(
+ queueName: string,
+ funcs: RunnerFuncs<T, R>,
+ opts: RunnerOptions<T>,
+) {
+ return restate.service({
+ name: runnerServiceName(queueName),
+ options: {
+ ingressPrivate: true,
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs * 2,
+ },
+ // No retries at runner level - dispatcher handles retry logic
+ retryPolicy: {
+ maxAttempts: 1,
+ },
+ journalRetention: {
+ days: 3,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ jobData: RunnerJobData<T>,
+ ): Promise<RunnerResult<R>> => {
+ // Validate payload if validator provided
+ let payload = jobData.data;
+ if (opts.validator) {
+ const res = opts.validator.safeParse(jobData.data);
+ if (!res.success) {
+ return {
+ type: "error",
+ error: {
+ name: "ValidationError",
+ message: res.error.message,
+ },
+ };
+ }
+ payload = res.data;
+ }
+
+ const res = await tryCatch(
+ ctx
+ .run(
+ "main logic",
+ async () => {
+ const result = await tryCatch(
+ funcs.run({
+ id: jobData.id,
+ data: payload,
+ priority: jobData.priority,
+ runNumber: jobData.runNumber,
+ abortSignal: AbortSignal.timeout(
+ jobData.timeoutSecs * 1000,
+ ),
+ }),
+ );
+ if (result.error) {
+ if (result.error instanceof QueueRetryAfterError) {
+ return {
+ type: "rate_limit" as const,
+ delayMs: result.error.delayMs,
+ };
+ }
+ throw result.error;
+ }
+ return { type: "success" as const, value: result.data };
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ )
+ .orTimeout({
+ seconds: jobData.timeoutSecs * 1.1,
+ }),
+ );
+
+ if (res.error) {
+ return {
+ type: "error",
+ error: serializeError(res.error),
+ };
+ }
+
+ return res.data as RunnerResult<R>;
+ },
+
+ onCompleted: async (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; result: R },
+ ): Promise<void> => {
+ await ctx.run(
+ "onComplete",
+ async () => {
+ await funcs.onComplete?.(
+ {
+ id: data.job.id,
+ data: data.job.data,
+ priority: data.job.priority,
+ runNumber: data.job.runNumber,
+ abortSignal: AbortSignal.timeout(data.job.timeoutSecs * 1000),
+ },
+ data.result,
+ );
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ },
+
+ onError: async (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; error: SerializedError },
+ ): Promise<void> => {
+ // Reconstruct the error
+ const reconstructedError = Object.assign(
+ new Error(data.error.message),
+ {
+ name: data.error.name,
+ stack: data.error.stack,
+ },
+ );
+
+ await ctx.run(
+ "onError",
+ async () => {
+ await funcs.onError?.({
+ id: data.job.id,
+ data: data.job.data,
+ priority: data.job.priority,
+ runNumber: data.job.runNumber,
+ numRetriesLeft: data.job.numRetriesLeft,
+ error: reconstructedError,
+ });
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ },
+ },
+ });
+}
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 451e7251..72fa938a 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -7,18 +7,20 @@ interface QueueItem {
awakeable: string;
idempotencyKey?: string;
priority: number;
+ leaseDurationMs: number;
}
interface LegacyQueueState {
itemsv2: Record<string, GroupState>;
inFlight: number;
paused: boolean;
+ leases: Record<string, number>;
}
interface QueueState {
groups: Record<string, GroupState>;
- inFlight: number;
paused: boolean;
+ leases: Record<string, number>;
}
interface GroupState {
@@ -40,6 +42,7 @@ export const semaphore = object({
awakeableId: string;
priority: number;
capacity: number;
+ leaseDurationMs: number;
groupId?: string;
idempotencyKey?: string;
},
@@ -59,7 +62,7 @@ export const semaphore = object({
state.groups[req.groupId] = {
id: req.groupId,
items: [],
- lastServedTimestamp: Date.now(),
+ lastServedTimestamp: await ctx.date.now(),
};
}
@@ -67,9 +70,10 @@ export const semaphore = object({
awakeable: req.awakeableId,
priority: req.priority,
idempotencyKey: req.idempotencyKey,
+ leaseDurationMs: req.leaseDurationMs,
});
- tick(ctx, state, req.capacity);
+ await tick(ctx, state, req.capacity);
setState(ctx, state);
return true;
@@ -82,11 +86,14 @@ export const semaphore = object({
},
async (
ctx: ObjectContext<LegacyQueueState>,
- capacity: number,
+ req: {
+ leaseId: string;
+ capacity: number;
+ },
): Promise<void> => {
const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
+ delete state.leases[req.leaseId];
+ await tick(ctx, state, req.capacity);
setState(ctx, state);
},
),
@@ -103,7 +110,7 @@ export const semaphore = object({
async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
const state = await getState(ctx);
state.paused = false;
- tick(ctx, state, 1);
+ await tick(ctx, state, 1);
setState(ctx, state);
},
),
@@ -111,7 +118,7 @@ export const semaphore = object({
{},
async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
const state = await getState(ctx);
- state.inFlight = 0;
+ state.leases = {};
setState(ctx, state);
},
),
@@ -119,7 +126,7 @@ export const semaphore = object({
{},
async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
const state = await getState(ctx);
- tick(ctx, state, 1);
+ await tick(ctx, state, 1);
setState(ctx, state);
},
),
@@ -130,7 +137,10 @@ export const semaphore = object({
});
// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(state: QueueState): {
+function selectAndPopItem(
+ state: QueueState,
+ now: number,
+): {
item: QueueItem;
groupId: string;
} {
@@ -165,25 +175,37 @@ function selectAndPopItem(state: QueueState): {
}
const [item] = state.groups[selected.groupId].items.splice(selected.index, 1);
- state.groups[selected.groupId].lastServedTimestamp = Date.now();
+ state.groups[selected.groupId].lastServedTimestamp = now;
if (state.groups[selected.groupId].items.length === 0) {
delete state.groups[selected.groupId];
}
return { item, groupId: selected.groupId };
}
-function tick(
+function pruneExpiredLeases(state: QueueState, now: number) {
+ for (const [leaseId, expiry] of Object.entries(state.leases)) {
+ if (expiry <= now) {
+ delete state.leases[leaseId];
+ }
+ }
+ return Object.keys(state.leases).length;
+}
+
+async function tick(
ctx: ObjectContext<LegacyQueueState>,
state: QueueState,
capacity: number,
-) {
+): Promise<void> {
+ let activeLeases = pruneExpiredLeases(state, await ctx.date.now());
while (
!state.paused &&
- state.inFlight < capacity &&
+ activeLeases < capacity &&
Object.keys(state.groups).length > 0
) {
- const { item } = selectAndPopItem(state);
- state.inFlight++;
+ const now = await ctx.date.now();
+ const { item } = selectAndPopItem(state, now);
+ state.leases[item.awakeable] = now + item.leaseDurationMs;
+ activeLeases++;
ctx.resolveAwakeable(item.awakeable);
}
}
@@ -193,11 +215,12 @@ async function getState(
): Promise<QueueState> {
const groups = (await ctx.get("itemsv2")) ?? {};
const paused = (await ctx.get("paused")) ?? false;
+ const leases = (await ctx.get("leases")) ?? {};
return {
groups,
paused,
- inFlight: (await ctx.get("inFlight")) ?? 0,
+ leases,
};
}
@@ -215,7 +238,7 @@ function idempotencyKeyAlreadyExists(
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
- ctx.set("inFlight", state.inFlight);
+ ctx.set("leases", state.leases);
ctx.set("paused", state.paused);
}
@@ -224,6 +247,7 @@ export class RestateSemaphore {
private readonly ctx: Context,
private readonly id: string,
private readonly capacity: number,
+ private readonly leaseDurationMs: number,
) {}
async acquire(priority: number, groupId?: string, idempotencyKey?: string) {
@@ -234,6 +258,7 @@ export class RestateSemaphore {
awakeableId: awk.id,
priority,
capacity: this.capacity,
+ leaseDurationMs: this.leaseDurationMs,
groupId,
idempotencyKey,
});
@@ -246,15 +271,15 @@ export class RestateSemaphore {
await awk.promise;
} catch (e) {
if (e instanceof restate.CancelledError) {
- await this.release();
- throw e;
+ await this.release(awk.id);
}
+ throw e;
}
- return true;
+ return awk.id;
}
- async release() {
+ async release(leaseId: string) {
await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .release(this.capacity);
+ .release({ leaseId, capacity: this.capacity });
}
}
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 9529dae3..139e5738 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -1,190 +1,26 @@
-import * as restate from "@restatedev/restate-sdk";
-
import type {
Queue,
QueueOptions,
RunnerFuncs,
RunnerOptions,
} from "@karakeep/shared/queueing";
-import { QueueRetryAfterError } from "@karakeep/shared/queueing";
-import { tryCatch } from "@karakeep/shared/tryCatch";
-import { RestateSemaphore } from "./semaphore";
+import { buildDispatcherService } from "./dispatcher";
+import { buildRunnerService } from "./runner";
+
+export interface RestateServicePair<T, R> {
+ dispatcher: ReturnType<typeof buildDispatcherService<T, R>>;
+ runner: ReturnType<typeof buildRunnerService<T, R>>;
+}
-export function buildRestateService<T, R>(
+export function buildRestateServices<T, R>(
queue: Queue<T>,
funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
queueOpts: QueueOptions,
-) {
- const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
- return restate.service({
- name: queue.name(),
- options: {
- inactivityTimeout: {
- seconds: opts.timeoutSecs,
- },
- retryPolicy: {
- maxAttempts: NUM_RETRIES,
- initialInterval: {
- seconds: 5,
- },
- maxInterval: {
- minutes: 1,
- },
- },
- journalRetention: {
- days: 3,
- },
- },
- handlers: {
- run: async (
- ctx: restate.Context,
- data: {
- payload: T;
- queuedIdempotencyKey?: string;
- priority: number;
- groupId?: string;
- },
- ) => {
- const id = ctx.rand.uuidv4();
- let payload = data.payload;
- if (opts.validator) {
- const res = opts.validator.safeParse(data.payload);
- if (!res.success) {
- throw new restate.TerminalError(res.error.message, {
- errorCode: 400,
- });
- }
- payload = res.data;
- }
-
- const priority = data.priority ?? 0;
-
- const semaphore = new RestateSemaphore(
- ctx,
- `queue:${queue.name()}`,
- opts.concurrency,
- );
-
- let lastError: Error | undefined;
- let runNumber = 0;
- while (runNumber <= NUM_RETRIES) {
- const acquired = await semaphore.acquire(
- priority,
- data.groupId,
- data.queuedIdempotencyKey,
- );
- if (!acquired) {
- return;
- }
- const res = await runWorkerLogic(ctx, funcs, {
- id,
- data: payload,
- priority,
- runNumber,
- numRetriesLeft: NUM_RETRIES - runNumber,
- abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
- });
- await semaphore.release();
-
- if (res.type === "rate_limit") {
- // Handle rate limit retries without counting against retry attempts
- await ctx.sleep(res.delayMs, "rate limit retry");
- // Don't increment runNumber - retry without counting against attempts
- continue;
- }
-
- if (res.type === "error") {
- if (res.error instanceof restate.CancelledError) {
- throw res.error;
- }
- lastError = res.error;
- // TODO: add backoff
- await ctx.sleep(1000, "error retry");
- runNumber++;
- } else {
- lastError = undefined;
- // Success
- break;
- }
- }
- if (lastError) {
- throw new restate.TerminalError(lastError.message, {
- errorCode: 500,
- cause: "cause" in lastError ? lastError.cause : undefined,
- });
- }
- },
- },
- });
-}
-
-type RunResult<R> =
- | { type: "success"; value: R }
- | { type: "rate_limit"; delayMs: number }
- | { type: "error"; error: Error };
-
-async function runWorkerLogic<T, R>(
- ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T, R>,
- data: {
- id: string;
- data: T;
- priority: number;
- runNumber: number;
- numRetriesLeft: number;
- abortSignal: AbortSignal;
- },
-): Promise<RunResult<R>> {
- const res = await tryCatch(
- ctx.run(
- `main logic`,
- async () => {
- const res = await tryCatch(run(data));
- if (res.error) {
- if (res.error instanceof QueueRetryAfterError) {
- return { type: "rate_limit" as const, delayMs: res.error.delayMs };
- }
- throw res.error; // Rethrow
- }
- return { type: "success" as const, value: res.data };
- },
- {
- maxRetryAttempts: 1,
- },
- ),
- );
-
- if (res.error) {
- await tryCatch(
- ctx.run(
- `onError`,
- async () =>
- onError?.({
- ...data,
- error: res.error,
- }),
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- return { type: "error", error: res.error };
- }
-
- const result = res.data;
-
- if (result.type === "rate_limit") {
- // Don't call onError or onComplete for rate limit retries
- return result;
- }
-
- // Success case - call onComplete
- await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data, result.value), {
- maxRetryAttempts: 1,
- }),
- );
- return result;
+): RestateServicePair<T, R> {
+ return {
+ dispatcher: buildDispatcherService<T, R>(queue, opts, queueOpts),
+ runner: buildRunnerService<T, R>(queue.name(), funcs, opts),
+ };
}
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 7da3f18e..3c4cf254 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -56,6 +56,12 @@ type TestAction =
val: number;
delayMs: number;
attemptsBeforeSuccess: number;
+ }
+ | {
+ type: "timeout-then-succeed";
+ val: number;
+ timeoutDurSec: number;
+ attemptsBeforeSuccess: number;
};
describe("Restate Queue Provider", () => {
@@ -70,6 +76,7 @@ describe("Restate Queue Provider", () => {
maxInFlight: 0,
baton: new Baton(),
rateLimitAttempts: new Map<string, number>(),
+ timeoutAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
@@ -90,6 +97,7 @@ describe("Restate Queue Provider", () => {
testState.maxInFlight = 0;
testState.baton = new Baton();
testState.rateLimitAttempts = new Map<string, number>();
+ testState.timeoutAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -157,6 +165,22 @@ describe("Restate Queue Provider", () => {
}
return jobData.val;
}
+ case "timeout-then-succeed": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.timeoutAttempts.get(attemptKey) || 0;
+ testState.timeoutAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ // Stall longer than the timeout to trigger a timeout
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.timeoutDurSec * 1000),
+ );
+ // This should not be reached if timeout works correctly
+ throw new Error("Should have timed out");
+ }
+ return jobData.val;
+ }
}
},
onError: async (job) => {
@@ -612,4 +636,68 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([]);
}, 90000);
});
+
+ describe("Timeout handling", () => {
+ it("should retry timed out jobs and not waste semaphore slots", async () => {
+ // This test verifies that:
+ // 1. Jobs that timeout get retried correctly
+ // 2. Semaphore slots are freed when jobs timeout (via lease expiry)
+ // 3. Other jobs can still run while a job is being retried
+
+ // Enqueue a job that will timeout on first attempt, succeed on second
+ // timeoutSecs is 2, so we stall for 5 seconds to ensure timeout
+ await queue.enqueue({
+ type: "timeout-then-succeed",
+ val: 42,
+ timeoutDurSec: 5,
+ attemptsBeforeSuccess: 1, // Timeout once, then succeed
+ });
+
+ // Wait a bit for the first attempt to start
+ await new Promise((resolve) => setTimeout(resolve, 500));
+
+ // Enqueue more jobs to verify semaphore slots are eventually freed
+ // With concurrency=3, these should be able to run after the timeout
+ await queue.enqueue({ type: "val", val: 100 });
+ await queue.enqueue({ type: "val", val: 101 });
+ await queue.enqueue({ type: "val", val: 102 });
+
+ await waitUntilQueueEmpty();
+
+ // The timeout job should have succeeded after retry
+ expect(testState.results).toContain(42);
+
+ // All other jobs should have completed
+ expect(testState.results).toContain(100);
+ expect(testState.results).toContain(101);
+ expect(testState.results).toContain(102);
+
+ // The timeout job should have been attempted twice
+ const attempts = Array.from(testState.timeoutAttempts.values())[0];
+ expect(attempts).toBe(2);
+
+ // Concurrency should not have exceeded the limit
+ expect(testState.maxInFlight).toBeLessThanOrEqual(3);
+ }, 120000);
+
+ it("should handle job that times out multiple times before succeeding", async () => {
+ // Enqueue a single job that times out twice before succeeding
+ // This tests that the retry mechanism works correctly for timeouts
+ await queue.enqueue({
+ type: "timeout-then-succeed",
+ val: 99,
+ timeoutDurSec: 5,
+ attemptsBeforeSuccess: 2, // Timeout twice, then succeed
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Job should eventually succeed
+ expect(testState.results).toEqual([99]);
+
+ // Should have been attempted 3 times (2 timeouts + 1 success)
+ const attempts = Array.from(testState.timeoutAttempts.values())[0];
+ expect(attempts).toBe(3);
+ }, 180000);
+ });
});
diff --git a/packages/plugins/queue-restate/src/types.ts b/packages/plugins/queue-restate/src/types.ts
new file mode 100644
index 00000000..e406a4d3
--- /dev/null
+++ b/packages/plugins/queue-restate/src/types.ts
@@ -0,0 +1,78 @@
+import { z } from "zod";
+
+/**
+ * Zod schema for serialized errors that cross the RPC boundary.
+ */
+export const zSerializedError = z.object({
+ name: z.string(),
+ message: z.string(),
+ stack: z.string().optional(),
+});
+
+export type SerializedError = z.infer<typeof zSerializedError>;
+
+/**
+ * Zod schema for job data passed from dispatcher to runner.
+ */
+export function zRunnerJobData<T extends z.ZodTypeAny>(payloadSchema: T) {
+ return z.object({
+ id: z.string(),
+ data: payloadSchema,
+ priority: z.number(),
+ runNumber: z.number(),
+ numRetriesLeft: z.number(),
+ timeoutSecs: z.number(),
+ });
+}
+
+export interface RunnerJobData<T> {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ numRetriesLeft: number;
+ timeoutSecs: number;
+}
+
+/**
+ * Zod schema for runner.run() response.
+ */
+export const zRunnerResult = z.discriminatedUnion("type", [
+ z.object({
+ type: z.literal("success"),
+ value: z.unknown(),
+ }),
+ z.object({
+ type: z.literal("rate_limit"),
+ delayMs: z.number(),
+ }),
+ z.object({
+ type: z.literal("error"),
+ error: zSerializedError,
+ }),
+]);
+
+export type RunnerResult<R> =
+ | { type: "success"; value: R }
+ | { type: "rate_limit"; delayMs: number }
+ | { type: "error"; error: SerializedError };
+
+/**
+ * Zod schema for runner.onCompleted() request.
+ */
+export function zOnCompletedRequest<T extends z.ZodTypeAny>(payloadSchema: T) {
+ return z.object({
+ job: zRunnerJobData(payloadSchema),
+ result: z.unknown(),
+ });
+}
+
+/**
+ * Zod schema for runner.onError() request.
+ */
+export function zOnErrorRequest<T extends z.ZodTypeAny>(payloadSchema: T) {
+ return z.object({
+ job: zRunnerJobData(payloadSchema),
+ error: zSerializedError,
+ });
+}