aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--packages/plugins/queue-restate/src/dispatcher.ts189
-rw-r--r--packages/plugins/queue-restate/src/env.ts11
-rw-r--r--packages/plugins/queue-restate/src/index.ts42
-rw-r--r--packages/plugins/queue-restate/src/runner.ts164
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts207
-rw-r--r--packages/plugins/queue-restate/src/service.ts154
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts185
-rw-r--r--packages/plugins/queue-restate/src/types.ts78
8 files changed, 807 insertions, 223 deletions
diff --git a/packages/plugins/queue-restate/src/dispatcher.ts b/packages/plugins/queue-restate/src/dispatcher.ts
new file mode 100644
index 00000000..6b48a571
--- /dev/null
+++ b/packages/plugins/queue-restate/src/dispatcher.ts
@@ -0,0 +1,189 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type {
+ Queue,
+ QueueOptions,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import logger from "@karakeep/shared/logger";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import type { RunnerJobData, RunnerResult, SerializedError } from "./types";
+import { runnerServiceName } from "./runner";
+import { RestateSemaphore } from "./semaphore";
+
+export function buildDispatcherService<T, R>(
+ queue: Queue<T>,
+ opts: RunnerOptions<T>,
+ queueOpts: QueueOptions,
+) {
+ const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
+ const runnerName = runnerServiceName(queue.name());
+
+ // Type definition for the runner service client
+ // Note: ctx parameter is required for Restate SDK to correctly infer client method signatures
+ interface RunnerService {
+ run: (
+ ctx: restate.Context,
+ data: RunnerJobData<T>,
+ ) => Promise<RunnerResult<R>>;
+ onCompleted: (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; result: R },
+ ) => Promise<void>;
+ onError: (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; error: SerializedError },
+ ) => Promise<void>;
+ }
+
+ return restate.service({
+ name: queue.name(),
+ options: {
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs * 2,
+ },
+ retryPolicy: {
+ maxAttempts: NUM_RETRIES,
+ initialInterval: {
+ seconds: 5,
+ },
+ maxInterval: {
+ minutes: 1,
+ },
+ },
+ journalRetention: {
+ days: 3,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ queuedIdempotencyKey?: string;
+ priority: number;
+ groupId?: string;
+ },
+ ) => {
+ const id = ctx.rand.uuidv4();
+ const priority = data.priority ?? 0;
+ const logDebug = async (message: string) => {
+ await ctx.run(
+ "log",
+ async () => {
+ logger.debug(message);
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ };
+
+ const semaphore = new RestateSemaphore(
+ ctx,
+ `queue:${queue.name()}`,
+ opts.concurrency,
+ Math.ceil(opts.timeoutSecs * 1.5 * 1000),
+ );
+
+ const runner = ctx.serviceClient<RunnerService>({ name: runnerName });
+
+ let runNumber = 0;
+ while (runNumber <= NUM_RETRIES) {
+ await logDebug(
+ `Dispatcher attempt ${runNumber} for queue ${queue.name()} job ${id} (priority=${priority}, groupId=${data.groupId ?? "none"})`,
+ );
+ const leaseId = await semaphore.acquire(
+ priority,
+ data.groupId,
+ data.queuedIdempotencyKey,
+ );
+ if (!leaseId) {
+ // Idempotency key already exists, skip
+ await logDebug(
+ `Dispatcher skipping queue ${queue.name()} job ${id} due to existing idempotency key`,
+ );
+ return;
+ }
+ await logDebug(
+ `Dispatcher acquired lease ${leaseId} for queue ${queue.name()} job ${id}`,
+ );
+
+ const jobData: RunnerJobData<T> = {
+ id,
+ data: data.payload,
+ priority,
+ runNumber,
+ numRetriesLeft: NUM_RETRIES - runNumber,
+ timeoutSecs: opts.timeoutSecs,
+ };
+
+ // Call the runner service
+ const res = await tryCatch(runner.run(jobData));
+
+ // Handle RPC-level errors (e.g., runner service unavailable)
+ if (res.error) {
+ await logDebug(
+ `Dispatcher RPC error for queue ${queue.name()} job ${id}: ${res.error instanceof Error ? res.error.message : String(res.error)}`,
+ );
+ await semaphore.release(leaseId);
+ if (res.error instanceof restate.CancelledError) {
+ throw res.error;
+ }
+ // Retry with backoff
+ await ctx.sleep(1000, "rpc error retry");
+ runNumber++;
+ continue;
+ }
+
+ const result = res.data;
+
+ if (result.type === "rate_limit") {
+ // Rate limit - release semaphore, sleep, and retry without incrementing runNumber
+ await logDebug(
+ `Dispatcher rate limit for queue ${queue.name()} job ${id} (delayMs=${result.delayMs})`,
+ );
+ await semaphore.release(leaseId);
+ await ctx.sleep(result.delayMs, "rate limit retry");
+ continue;
+ }
+
+ if (result.type === "error") {
+ // Call onError on the runner BEFORE releasing semaphore
+ // This ensures inFlight tracking stays consistent
+ await logDebug(
+ `Dispatcher runner error for queue ${queue.name()} job ${id}: ${result.error.message}`,
+ );
+ await tryCatch(
+ runner.onError({
+ job: jobData,
+ error: result.error,
+ }),
+ );
+ await semaphore.release(leaseId);
+
+ // Retry with backoff
+ await ctx.sleep(1000, "error retry");
+ runNumber++;
+ continue;
+ }
+
+ // Success - call onCompleted BEFORE releasing semaphore
+ // This ensures inFlight tracking stays consistent
+ await logDebug(
+ `Dispatcher completed queue ${queue.name()} job ${id}`,
+ );
+ await tryCatch(
+ runner.onCompleted({
+ job: jobData,
+ result: result.value,
+ }),
+ );
+ await semaphore.release(leaseId);
+ break;
+ }
+ },
+ },
+ });
+}
diff --git a/packages/plugins/queue-restate/src/env.ts b/packages/plugins/queue-restate/src/env.ts
index 01175e86..41003460 100644
--- a/packages/plugins/queue-restate/src/env.ts
+++ b/packages/plugins/queue-restate/src/env.ts
@@ -1,5 +1,12 @@
import { z } from "zod";
+const stringBool = (defaultValue: string) =>
+ z
+ .string()
+ .default(defaultValue)
+ .refine((s) => s === "true" || s === "false")
+ .transform((s) => s === "true");
+
export const envConfig = z
.object({
RESTATE_LISTEN_PORT: z.coerce.number().optional(),
@@ -9,5 +16,9 @@ export const envConfig = z
.default("http://localhost:8080"),
RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"),
RESTATE_PUB_KEY: z.string().optional(),
+ RESTATE_EXPOSE_CORE_SERVICES: stringBool("true"),
+ // Deployment mode configuration - allows running dispatchers and runners separately
+ RESTATE_ENABLE_DISPATCHERS: stringBool("true"),
+ RESTATE_ENABLE_RUNNERS: stringBool("true"),
})
.parse(process.env);
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
index fa636f87..f8761291 100644
--- a/packages/plugins/queue-restate/src/index.ts
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -17,7 +17,7 @@ import { AdminClient } from "./admin";
import { envConfig } from "./env";
import { idProvider } from "./idProvider";
import { semaphore } from "./semaphore";
-import { buildRestateService } from "./service";
+import { buildRestateServices } from "./service";
class RestateQueueWrapper<T> implements Queue<T> {
constructor(
@@ -27,6 +27,10 @@ class RestateQueueWrapper<T> implements Queue<T> {
public readonly opts: QueueOptions,
) {}
+ ensureInit(): Promise<void> {
+ return Promise.resolve();
+ }
+
name(): string {
return this._name;
}
@@ -87,12 +91,13 @@ class RestateQueueWrapper<T> implements Queue<T> {
class RestateRunnerWrapper<T> implements Runner<T> {
constructor(
- private readonly wf: restate.ServiceDefinition<
+ private readonly dispatcherDef: restate.ServiceDefinition<
string,
{
run: (ctx: restate.Context, data: T) => Promise<void>;
}
>,
+ private readonly runnerDef: restate.ServiceDefinition<string, unknown>,
) {}
async run(): Promise<void> {
@@ -107,8 +112,12 @@ class RestateRunnerWrapper<T> implements Runner<T> {
throw new Error("Method not implemented.");
}
- get def(): restate.WorkflowDefinition<string, unknown> {
- return this.wf;
+ get dispatcherService(): restate.ServiceDefinition<string, unknown> {
+ return this.dispatcherDef;
+ }
+
+ get runnerService(): restate.ServiceDefinition<string, unknown> {
+ return this.runnerDef;
}
}
@@ -130,13 +139,24 @@ class RestateQueueClient implements QueueClient {
}
async start(): Promise<void> {
+ const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = [];
+
+ for (const svc of this.services.values()) {
+ if (envConfig.RESTATE_ENABLE_DISPATCHERS) {
+ servicesToExpose.push(svc.dispatcherService);
+ }
+ if (envConfig.RESTATE_ENABLE_RUNNERS) {
+ servicesToExpose.push(svc.runnerService);
+ }
+ }
+
+ if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) {
+ servicesToExpose.push(semaphore, idProvider);
+ }
+
const port = await restate.serve({
port: envConfig.RESTATE_LISTEN_PORT ?? 0,
- services: [
- ...[...this.services.values()].map((svc) => svc.def),
- semaphore,
- idProvider,
- ],
+ services: servicesToExpose,
identityKeys: envConfig.RESTATE_PUB_KEY
? [envConfig.RESTATE_PUB_KEY]
: undefined,
@@ -175,8 +195,10 @@ class RestateQueueClient implements QueueClient {
if (wrapper) {
throw new Error(`Queue ${name} already exists`);
}
+ const services = buildRestateServices(queue, funcs, opts, queue.opts);
const svc = new RestateRunnerWrapper<T>(
- buildRestateService(queue, funcs, opts, queue.opts),
+ services.dispatcher,
+ services.runner,
);
this.services.set(name, svc);
return svc;
diff --git a/packages/plugins/queue-restate/src/runner.ts b/packages/plugins/queue-restate/src/runner.ts
new file mode 100644
index 00000000..2bda9f72
--- /dev/null
+++ b/packages/plugins/queue-restate/src/runner.ts
@@ -0,0 +1,164 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type { RunnerFuncs, RunnerOptions } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import type { RunnerJobData, RunnerResult, SerializedError } from "./types";
+
+function serializeError(error: Error): SerializedError {
+ return {
+ name: error.name,
+ message: error.message,
+ stack: error.stack,
+ };
+}
+
+export function runnerServiceName(queueName: string): string {
+ return `${queueName}-runner`;
+}
+
+export function buildRunnerService<T, R>(
+ queueName: string,
+ funcs: RunnerFuncs<T, R>,
+ opts: RunnerOptions<T>,
+) {
+ return restate.service({
+ name: runnerServiceName(queueName),
+ options: {
+ ingressPrivate: true,
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs * 2,
+ },
+ // No retries at runner level - dispatcher handles retry logic
+ retryPolicy: {
+ maxAttempts: 1,
+ },
+ journalRetention: {
+ days: 3,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ jobData: RunnerJobData<T>,
+ ): Promise<RunnerResult<R>> => {
+ // Validate payload if validator provided
+ let payload = jobData.data;
+ if (opts.validator) {
+ const res = opts.validator.safeParse(jobData.data);
+ if (!res.success) {
+ return {
+ type: "error",
+ error: {
+ name: "ValidationError",
+ message: res.error.message,
+ },
+ };
+ }
+ payload = res.data;
+ }
+
+ const res = await tryCatch(
+ ctx
+ .run(
+ "main logic",
+ async () => {
+ const result = await tryCatch(
+ funcs.run({
+ id: jobData.id,
+ data: payload,
+ priority: jobData.priority,
+ runNumber: jobData.runNumber,
+ abortSignal: AbortSignal.timeout(
+ jobData.timeoutSecs * 1000,
+ ),
+ }),
+ );
+ if (result.error) {
+ if (result.error instanceof QueueRetryAfterError) {
+ return {
+ type: "rate_limit" as const,
+ delayMs: result.error.delayMs,
+ };
+ }
+ throw result.error;
+ }
+ return { type: "success" as const, value: result.data };
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ )
+ .orTimeout({
+ seconds: jobData.timeoutSecs * 1.1,
+ }),
+ );
+
+ if (res.error) {
+ return {
+ type: "error",
+ error: serializeError(res.error),
+ };
+ }
+
+ return res.data as RunnerResult<R>;
+ },
+
+ onCompleted: async (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; result: R },
+ ): Promise<void> => {
+ await ctx.run(
+ "onComplete",
+ async () => {
+ await funcs.onComplete?.(
+ {
+ id: data.job.id,
+ data: data.job.data,
+ priority: data.job.priority,
+ runNumber: data.job.runNumber,
+ abortSignal: AbortSignal.timeout(data.job.timeoutSecs * 1000),
+ },
+ data.result,
+ );
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ },
+
+ onError: async (
+ ctx: restate.Context,
+ data: { job: RunnerJobData<T>; error: SerializedError },
+ ): Promise<void> => {
+ // Reconstruct the error
+ const reconstructedError = Object.assign(
+ new Error(data.error.message),
+ {
+ name: data.error.name,
+ stack: data.error.stack,
+ },
+ );
+
+ await ctx.run(
+ "onError",
+ async () => {
+ await funcs.onError?.({
+ id: data.job.id,
+ data: data.job.data,
+ priority: data.job.priority,
+ runNumber: data.job.runNumber,
+ numRetriesLeft: data.job.numRetriesLeft,
+ error: reconstructedError,
+ });
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ );
+ },
+ },
+ });
+}
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
index 694c77b3..72fa938a 100644
--- a/packages/plugins/queue-restate/src/semaphore.ts
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -7,17 +7,20 @@ interface QueueItem {
awakeable: string;
idempotencyKey?: string;
priority: number;
+ leaseDurationMs: number;
}
interface LegacyQueueState {
- items: QueueItem[];
itemsv2: Record<string, GroupState>;
inFlight: number;
+ paused: boolean;
+ leases: Record<string, number>;
}
interface QueueState {
groups: Record<string, GroupState>;
- inFlight: number;
+ paused: boolean;
+ leases: Record<string, number>;
}
interface GroupState {
@@ -29,65 +32,115 @@ interface GroupState {
export const semaphore = object({
name: "Semaphore",
handlers: {
- acquire: async (
- ctx: ObjectContext<LegacyQueueState>,
- req: {
- awakeableId: string;
- priority: number;
- capacity: number;
- groupId?: string;
- idempotencyKey?: string;
+ acquire: restate.handlers.object.exclusive(
+ {
+ ingressPrivate: true,
},
- ): Promise<boolean> => {
- const state = await getState(ctx);
-
- if (
- req.idempotencyKey &&
- idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
- ) {
- return false;
- }
+ async (
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ awakeableId: string;
+ priority: number;
+ capacity: number;
+ leaseDurationMs: number;
+ groupId?: string;
+ idempotencyKey?: string;
+ },
+ ): Promise<boolean> => {
+ const state = await getState(ctx);
- req.groupId = req.groupId ?? "__ungrouped__";
+ if (
+ req.idempotencyKey &&
+ idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey)
+ ) {
+ return false;
+ }
- if (state.groups[req.groupId] === undefined) {
- state.groups[req.groupId] = {
- id: req.groupId,
- items: [],
- lastServedTimestamp: Date.now(),
- };
- }
+ req.groupId = req.groupId ?? "__ungrouped__";
- state.groups[req.groupId].items.push({
- awakeable: req.awakeableId,
- priority: req.priority,
- idempotencyKey: req.idempotencyKey,
- });
+ if (state.groups[req.groupId] === undefined) {
+ state.groups[req.groupId] = {
+ id: req.groupId,
+ items: [],
+ lastServedTimestamp: await ctx.date.now(),
+ };
+ }
- tick(ctx, state, req.capacity);
+ state.groups[req.groupId].items.push({
+ awakeable: req.awakeableId,
+ priority: req.priority,
+ idempotencyKey: req.idempotencyKey,
+ leaseDurationMs: req.leaseDurationMs,
+ });
- setState(ctx, state);
- return true;
- },
-
- release: async (
- ctx: ObjectContext<LegacyQueueState>,
- capacity: number,
- ): Promise<void> => {
- const state = await getState(ctx);
- state.inFlight--;
- tick(ctx, state, capacity);
- setState(ctx, state);
- },
+ await tick(ctx, state, req.capacity);
+
+ setState(ctx, state);
+ return true;
+ },
+ ),
+
+ release: restate.handlers.object.exclusive(
+ {
+ ingressPrivate: true,
+ },
+ async (
+ ctx: ObjectContext<LegacyQueueState>,
+ req: {
+ leaseId: string;
+ capacity: number;
+ },
+ ): Promise<void> => {
+ const state = await getState(ctx);
+ delete state.leases[req.leaseId];
+ await tick(ctx, state, req.capacity);
+ setState(ctx, state);
+ },
+ ),
+ pause: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.paused = true;
+ setState(ctx, state);
+ },
+ ),
+ resume: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.paused = false;
+ await tick(ctx, state, 1);
+ setState(ctx, state);
+ },
+ ),
+ resetInflight: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ state.leases = {};
+ setState(ctx, state);
+ },
+ ),
+ tick: restate.handlers.object.exclusive(
+ {},
+ async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => {
+ const state = await getState(ctx);
+ await tick(ctx, state, 1);
+ setState(ctx, state);
+ },
+ ),
},
options: {
- ingressPrivate: true,
journalRetention: 0,
},
});
// Lower numbers represent higher priority, mirroring Liteque’s semantics.
-function selectAndPopItem(state: QueueState): {
+function selectAndPopItem(
+ state: QueueState,
+ now: number,
+): {
item: QueueItem;
groupId: string;
} {
@@ -122,21 +175,37 @@ function selectAndPopItem(state: QueueState): {
}
const [item] = state.groups[selected.groupId].items.splice(selected.index, 1);
- state.groups[selected.groupId].lastServedTimestamp = Date.now();
+ state.groups[selected.groupId].lastServedTimestamp = now;
if (state.groups[selected.groupId].items.length === 0) {
delete state.groups[selected.groupId];
}
return { item, groupId: selected.groupId };
}
-function tick(
+function pruneExpiredLeases(state: QueueState, now: number) {
+ for (const [leaseId, expiry] of Object.entries(state.leases)) {
+ if (expiry <= now) {
+ delete state.leases[leaseId];
+ }
+ }
+ return Object.keys(state.leases).length;
+}
+
+async function tick(
ctx: ObjectContext<LegacyQueueState>,
state: QueueState,
capacity: number,
-) {
- while (state.inFlight < capacity && Object.keys(state.groups).length > 0) {
- const { item } = selectAndPopItem(state);
- state.inFlight++;
+): Promise<void> {
+ let activeLeases = pruneExpiredLeases(state, await ctx.date.now());
+ while (
+ !state.paused &&
+ activeLeases < capacity &&
+ Object.keys(state.groups).length > 0
+ ) {
+ const now = await ctx.date.now();
+ const { item } = selectAndPopItem(state, now);
+ state.leases[item.awakeable] = now + item.leaseDurationMs;
+ activeLeases++;
ctx.resolveAwakeable(item.awakeable);
}
}
@@ -145,19 +214,13 @@ async function getState(
ctx: ObjectContext<LegacyQueueState>,
): Promise<QueueState> {
const groups = (await ctx.get("itemsv2")) ?? {};
- const items = (await ctx.get("items")) ?? [];
-
- if (items.length > 0) {
- groups["__legacy__"] = {
- id: "__legacy__",
- items,
- lastServedTimestamp: 0,
- };
- }
+ const paused = (await ctx.get("paused")) ?? false;
+ const leases = (await ctx.get("leases")) ?? {};
return {
groups,
- inFlight: (await ctx.get("inFlight")) ?? 0,
+ paused,
+ leases,
};
}
@@ -175,8 +238,8 @@ function idempotencyKeyAlreadyExists(
function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) {
ctx.set("itemsv2", state.groups);
- ctx.set("inFlight", state.inFlight);
- ctx.clear("items");
+ ctx.set("leases", state.leases);
+ ctx.set("paused", state.paused);
}
export class RestateSemaphore {
@@ -184,6 +247,7 @@ export class RestateSemaphore {
private readonly ctx: Context,
private readonly id: string,
private readonly capacity: number,
+ private readonly leaseDurationMs: number,
) {}
async acquire(priority: number, groupId?: string, idempotencyKey?: string) {
@@ -194,6 +258,7 @@ export class RestateSemaphore {
awakeableId: awk.id,
priority,
capacity: this.capacity,
+ leaseDurationMs: this.leaseDurationMs,
groupId,
idempotencyKey,
});
@@ -206,15 +271,15 @@ export class RestateSemaphore {
await awk.promise;
} catch (e) {
if (e instanceof restate.CancelledError) {
- await this.release();
- throw e;
+ await this.release(awk.id);
}
+ throw e;
}
- return true;
+ return awk.id;
}
- async release() {
+ async release(leaseId: string) {
await this.ctx
.objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
- .release(this.capacity);
+ .release({ leaseId, capacity: this.capacity });
}
}
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
index 5ba7d1df..139e5738 100644
--- a/packages/plugins/queue-restate/src/service.ts
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -1,154 +1,26 @@
-import * as restate from "@restatedev/restate-sdk";
-
import type {
Queue,
QueueOptions,
RunnerFuncs,
RunnerOptions,
} from "@karakeep/shared/queueing";
-import { tryCatch } from "@karakeep/shared/tryCatch";
-import { genId } from "./idProvider";
-import { RestateSemaphore } from "./semaphore";
+import { buildDispatcherService } from "./dispatcher";
+import { buildRunnerService } from "./runner";
+
+export interface RestateServicePair<T, R> {
+ dispatcher: ReturnType<typeof buildDispatcherService<T, R>>;
+ runner: ReturnType<typeof buildRunnerService<T, R>>;
+}
-export function buildRestateService<T, R>(
+export function buildRestateServices<T, R>(
queue: Queue<T>,
funcs: RunnerFuncs<T, R>,
opts: RunnerOptions<T>,
queueOpts: QueueOptions,
-) {
- const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
- return restate.service({
- name: queue.name(),
- options: {
- inactivityTimeout: {
- seconds: opts.timeoutSecs,
- },
- retryPolicy: {
- maxAttempts: NUM_RETRIES,
- initialInterval: {
- seconds: 5,
- },
- maxInterval: {
- minutes: 1,
- },
- },
- },
- handlers: {
- run: async (
- ctx: restate.Context,
- data: {
- payload: T;
- queuedIdempotencyKey?: string;
- priority: number;
- groupId?: string;
- },
- ) => {
- const id = `${await genId(ctx)}`;
- let payload = data.payload;
- if (opts.validator) {
- const res = opts.validator.safeParse(data.payload);
- if (!res.success) {
- throw new restate.TerminalError(res.error.message, {
- errorCode: 400,
- });
- }
- payload = res.data;
- }
-
- const priority = data.priority ?? 0;
-
- const semaphore = new RestateSemaphore(
- ctx,
- `queue:${queue.name()}`,
- opts.concurrency,
- );
-
- let lastError: Error | undefined;
- for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
- const acquired = await semaphore.acquire(
- priority,
- data.groupId,
- data.queuedIdempotencyKey,
- );
- if (!acquired) {
- return;
- }
- const res = await runWorkerLogic(ctx, funcs, {
- id,
- data: payload,
- priority,
- runNumber,
- numRetriesLeft: NUM_RETRIES - runNumber,
- abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
- });
- await semaphore.release();
- if (res.error) {
- if (res.error instanceof restate.CancelledError) {
- throw res.error;
- }
- lastError = res.error;
- // TODO: add backoff
- await ctx.sleep(1000);
- } else {
- break;
- }
- }
- if (lastError) {
- throw new restate.TerminalError(lastError.message, {
- errorCode: 500,
- cause: "cause" in lastError ? lastError.cause : undefined,
- });
- }
- },
- },
- });
-}
-
-async function runWorkerLogic<T, R>(
- ctx: restate.Context,
- { run, onError, onComplete }: RunnerFuncs<T, R>,
- data: {
- id: string;
- data: T;
- priority: number;
- runNumber: number;
- numRetriesLeft: number;
- abortSignal: AbortSignal;
- },
-) {
- const res = await tryCatch(
- ctx.run(
- `main logic`,
- async () => {
- return await run(data);
- },
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- if (res.error) {
- await tryCatch(
- ctx.run(
- `onError`,
- async () =>
- onError?.({
- ...data,
- error: res.error,
- }),
- {
- maxRetryAttempts: 1,
- },
- ),
- );
- return res;
- }
-
- await tryCatch(
- ctx.run("onComplete", async () => await onComplete?.(data, res.data), {
- maxRetryAttempts: 1,
- }),
- );
- return res;
+): RestateServicePair<T, R> {
+ return {
+ dispatcher: buildDispatcherService<T, R>(queue, opts, queueOpts),
+ runner: buildRunnerService<T, R>(queue.name(), funcs, opts),
+ };
}
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
index 2085d57b..3c4cf254 100644
--- a/packages/plugins/queue-restate/src/tests/queue.test.ts
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -10,6 +10,7 @@ import {
} from "vitest";
import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+import { QueueRetryAfterError } from "@karakeep/shared/queueing";
import { AdminClient } from "../admin";
import { RestateQueueProvider } from "../index";
@@ -49,7 +50,19 @@ type TestAction =
| { type: "val"; val: number }
| { type: "err"; err: string }
| { type: "stall"; durSec: number }
- | { type: "semaphore-acquire" };
+ | { type: "semaphore-acquire" }
+ | {
+ type: "rate-limit";
+ val: number;
+ delayMs: number;
+ attemptsBeforeSuccess: number;
+ }
+ | {
+ type: "timeout-then-succeed";
+ val: number;
+ timeoutDurSec: number;
+ attemptsBeforeSuccess: number;
+ };
describe("Restate Queue Provider", () => {
let queueClient: QueueClient;
@@ -62,6 +75,8 @@ describe("Restate Queue Provider", () => {
inFlight: 0,
maxInFlight: 0,
baton: new Baton(),
+ rateLimitAttempts: new Map<string, number>(),
+ timeoutAttempts: new Map<string, number>(),
};
async function waitUntilQueueEmpty() {
@@ -81,6 +96,8 @@ describe("Restate Queue Provider", () => {
testState.inFlight = 0;
testState.maxInFlight = 0;
testState.baton = new Baton();
+ testState.rateLimitAttempts = new Map<string, number>();
+ testState.timeoutAttempts = new Map<string, number>();
});
afterEach(async () => {
await waitUntilQueueEmpty();
@@ -133,6 +150,37 @@ describe("Restate Queue Provider", () => {
break;
case "semaphore-acquire":
await testState.baton.acquire();
+ break;
+ case "rate-limit": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.rateLimitAttempts.get(attemptKey) || 0;
+ testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ throw new QueueRetryAfterError(
+ `Rate limited (attempt ${currentAttempts + 1})`,
+ jobData.delayMs,
+ );
+ }
+ return jobData.val;
+ }
+ case "timeout-then-succeed": {
+ const attemptKey = `${job.id}`;
+ const currentAttempts =
+ testState.timeoutAttempts.get(attemptKey) || 0;
+ testState.timeoutAttempts.set(attemptKey, currentAttempts + 1);
+
+ if (currentAttempts < jobData.attemptsBeforeSuccess) {
+ // Stall longer than the timeout to trigger a timeout
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.timeoutDurSec * 1000),
+ );
+ // This should not be reached if timeout works correctly
+ throw new Error("Should have timed out");
+ }
+ return jobData.val;
+ }
}
},
onError: async (job) => {
@@ -517,4 +565,139 @@ describe("Restate Queue Provider", () => {
expect(testState.results).toEqual([102, 101, 100]);
}, 60000);
});
+
+ describe("QueueRetryAfterError handling", () => {
+ it("should retry after delay without counting against retry attempts", async () => {
+ const startTime = Date.now();
+
+ // This job will fail with QueueRetryAfterError twice before succeeding
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 42,
+ delayMs: 500, // 500ms delay
+ attemptsBeforeSuccess: 2, // Fail twice, succeed on third try
+ });
+
+ await waitUntilQueueEmpty();
+
+ const duration = Date.now() - startTime;
+
+ // Should have succeeded
+ expect(testState.results).toEqual([42]);
+
+ // Should have been called 3 times (2 rate limit failures + 1 success)
+ expect(testState.rateLimitAttempts.size).toBe(1);
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(3);
+
+ // Should have waited at least 1 second total (2 x 500ms delays)
+ expect(duration).toBeGreaterThanOrEqual(1000);
+
+ // onError should NOT have been called for rate limit retries
+ expect(testState.errors).toEqual([]);
+ }, 60000);
+
+ it("should not exhaust retries when rate limited", async () => {
+ // This job will be rate limited many more times than the retry limit
+ // but should still eventually succeed
+ await queue.enqueue({
+ type: "rate-limit",
+ val: 100,
+ delayMs: 100, // Short delay for faster test
+ attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit)
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Should have succeeded despite being "retried" more than the limit
+ expect(testState.results).toEqual([100]);
+
+ // Should have been called 11 times (10 rate limit failures + 1 success)
+ const attempts = Array.from(testState.rateLimitAttempts.values())[0];
+ expect(attempts).toBe(11);
+
+ // No errors should have been recorded
+ expect(testState.errors).toEqual([]);
+ }, 90000);
+
+ it("should still respect retry limit for non-rate-limit errors", async () => {
+ // Enqueue a regular error job that should fail permanently
+ await queue.enqueue({ type: "err", err: "Regular error" });
+
+ await waitUntilQueueEmpty();
+
+ // Should have failed 4 times (initial + 3 retries) and not succeeded
+ expect(testState.errors).toEqual([
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ "Regular error",
+ ]);
+ expect(testState.results).toEqual([]);
+ }, 90000);
+ });
+
+ describe("Timeout handling", () => {
+ it("should retry timed out jobs and not waste semaphore slots", async () => {
+ // This test verifies that:
+ // 1. Jobs that timeout get retried correctly
+ // 2. Semaphore slots are freed when jobs timeout (via lease expiry)
+ // 3. Other jobs can still run while a job is being retried
+
+ // Enqueue a job that will timeout on first attempt, succeed on second
+ // timeoutSecs is 2, so we stall for 5 seconds to ensure timeout
+ await queue.enqueue({
+ type: "timeout-then-succeed",
+ val: 42,
+ timeoutDurSec: 5,
+ attemptsBeforeSuccess: 1, // Timeout once, then succeed
+ });
+
+ // Wait a bit for the first attempt to start
+ await new Promise((resolve) => setTimeout(resolve, 500));
+
+ // Enqueue more jobs to verify semaphore slots are eventually freed
+ // With concurrency=3, these should be able to run after the timeout
+ await queue.enqueue({ type: "val", val: 100 });
+ await queue.enqueue({ type: "val", val: 101 });
+ await queue.enqueue({ type: "val", val: 102 });
+
+ await waitUntilQueueEmpty();
+
+ // The timeout job should have succeeded after retry
+ expect(testState.results).toContain(42);
+
+ // All other jobs should have completed
+ expect(testState.results).toContain(100);
+ expect(testState.results).toContain(101);
+ expect(testState.results).toContain(102);
+
+ // The timeout job should have been attempted twice
+ const attempts = Array.from(testState.timeoutAttempts.values())[0];
+ expect(attempts).toBe(2);
+
+ // Concurrency should not have exceeded the limit
+ expect(testState.maxInFlight).toBeLessThanOrEqual(3);
+ }, 120000);
+
+ it("should handle job that times out multiple times before succeeding", async () => {
+ // Enqueue a single job that times out twice before succeeding
+ // This tests that the retry mechanism works correctly for timeouts
+ await queue.enqueue({
+ type: "timeout-then-succeed",
+ val: 99,
+ timeoutDurSec: 5,
+ attemptsBeforeSuccess: 2, // Timeout twice, then succeed
+ });
+
+ await waitUntilQueueEmpty();
+
+ // Job should eventually succeed
+ expect(testState.results).toEqual([99]);
+
+ // Should have been attempted 3 times (2 timeouts + 1 success)
+ const attempts = Array.from(testState.timeoutAttempts.values())[0];
+ expect(attempts).toBe(3);
+ }, 180000);
+ });
});
diff --git a/packages/plugins/queue-restate/src/types.ts b/packages/plugins/queue-restate/src/types.ts
new file mode 100644
index 00000000..e406a4d3
--- /dev/null
+++ b/packages/plugins/queue-restate/src/types.ts
@@ -0,0 +1,78 @@
+import { z } from "zod";
+
+/**
+ * Zod schema for serialized errors that cross the RPC boundary.
+ */
+export const zSerializedError = z.object({
+ name: z.string(),
+ message: z.string(),
+ stack: z.string().optional(),
+});
+
+export type SerializedError = z.infer<typeof zSerializedError>;
+
+/**
+ * Zod schema for job data passed from dispatcher to runner.
+ */
+export function zRunnerJobData<T extends z.ZodTypeAny>(payloadSchema: T) {
+ return z.object({
+ id: z.string(),
+ data: payloadSchema,
+ priority: z.number(),
+ runNumber: z.number(),
+ numRetriesLeft: z.number(),
+ timeoutSecs: z.number(),
+ });
+}
+
+export interface RunnerJobData<T> {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ numRetriesLeft: number;
+ timeoutSecs: number;
+}
+
+/**
+ * Zod schema for runner.run() response.
+ */
+export const zRunnerResult = z.discriminatedUnion("type", [
+ z.object({
+ type: z.literal("success"),
+ value: z.unknown(),
+ }),
+ z.object({
+ type: z.literal("rate_limit"),
+ delayMs: z.number(),
+ }),
+ z.object({
+ type: z.literal("error"),
+ error: zSerializedError,
+ }),
+]);
+
+export type RunnerResult<R> =
+ | { type: "success"; value: R }
+ | { type: "rate_limit"; delayMs: number }
+ | { type: "error"; error: SerializedError };
+
+/**
+ * Zod schema for runner.onCompleted() request.
+ */
+export function zOnCompletedRequest<T extends z.ZodTypeAny>(payloadSchema: T) {
+ return z.object({
+ job: zRunnerJobData(payloadSchema),
+ result: z.unknown(),
+ });
+}
+
+/**
+ * Zod schema for runner.onError() request.
+ */
+export function zOnErrorRequest<T extends z.ZodTypeAny>(payloadSchema: T) {
+ return z.object({
+ job: zRunnerJobData(payloadSchema),
+ error: zSerializedError,
+ });
+}