diff options
Diffstat (limited to '')
| -rw-r--r-- | packages/plugins/queue-restate/src/dispatcher.ts | 189 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/env.ts | 11 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 42 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/runner.ts | 164 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/semaphore.ts | 207 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/service.ts | 154 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/tests/queue.test.ts | 185 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/types.ts | 78 |
8 files changed, 807 insertions, 223 deletions
diff --git a/packages/plugins/queue-restate/src/dispatcher.ts b/packages/plugins/queue-restate/src/dispatcher.ts new file mode 100644 index 00000000..6b48a571 --- /dev/null +++ b/packages/plugins/queue-restate/src/dispatcher.ts @@ -0,0 +1,189 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { + Queue, + QueueOptions, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import type { RunnerJobData, RunnerResult, SerializedError } from "./types"; +import { runnerServiceName } from "./runner"; +import { RestateSemaphore } from "./semaphore"; + +export function buildDispatcherService<T, R>( + queue: Queue<T>, + opts: RunnerOptions<T>, + queueOpts: QueueOptions, +) { + const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; + const runnerName = runnerServiceName(queue.name()); + + // Type definition for the runner service client + // Note: ctx parameter is required for Restate SDK to correctly infer client method signatures + interface RunnerService { + run: ( + ctx: restate.Context, + data: RunnerJobData<T>, + ) => Promise<RunnerResult<R>>; + onCompleted: ( + ctx: restate.Context, + data: { job: RunnerJobData<T>; result: R }, + ) => Promise<void>; + onError: ( + ctx: restate.Context, + data: { job: RunnerJobData<T>; error: SerializedError }, + ) => Promise<void>; + } + + return restate.service({ + name: queue.name(), + options: { + inactivityTimeout: { + seconds: opts.timeoutSecs * 2, + }, + retryPolicy: { + maxAttempts: NUM_RETRIES, + initialInterval: { + seconds: 5, + }, + maxInterval: { + minutes: 1, + }, + }, + journalRetention: { + days: 3, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + data: { + payload: T; + queuedIdempotencyKey?: string; + priority: number; + groupId?: string; + }, + ) => { + const id = ctx.rand.uuidv4(); + const priority = data.priority ?? 0; + const logDebug = async (message: string) => { + await ctx.run( + "log", + async () => { + logger.debug(message); + }, + { + maxRetryAttempts: 1, + }, + ); + }; + + const semaphore = new RestateSemaphore( + ctx, + `queue:${queue.name()}`, + opts.concurrency, + Math.ceil(opts.timeoutSecs * 1.5 * 1000), + ); + + const runner = ctx.serviceClient<RunnerService>({ name: runnerName }); + + let runNumber = 0; + while (runNumber <= NUM_RETRIES) { + await logDebug( + `Dispatcher attempt ${runNumber} for queue ${queue.name()} job ${id} (priority=${priority}, groupId=${data.groupId ?? "none"})`, + ); + const leaseId = await semaphore.acquire( + priority, + data.groupId, + data.queuedIdempotencyKey, + ); + if (!leaseId) { + // Idempotency key already exists, skip + await logDebug( + `Dispatcher skipping queue ${queue.name()} job ${id} due to existing idempotency key`, + ); + return; + } + await logDebug( + `Dispatcher acquired lease ${leaseId} for queue ${queue.name()} job ${id}`, + ); + + const jobData: RunnerJobData<T> = { + id, + data: data.payload, + priority, + runNumber, + numRetriesLeft: NUM_RETRIES - runNumber, + timeoutSecs: opts.timeoutSecs, + }; + + // Call the runner service + const res = await tryCatch(runner.run(jobData)); + + // Handle RPC-level errors (e.g., runner service unavailable) + if (res.error) { + await logDebug( + `Dispatcher RPC error for queue ${queue.name()} job ${id}: ${res.error instanceof Error ? res.error.message : String(res.error)}`, + ); + await semaphore.release(leaseId); + if (res.error instanceof restate.CancelledError) { + throw res.error; + } + // Retry with backoff + await ctx.sleep(1000, "rpc error retry"); + runNumber++; + continue; + } + + const result = res.data; + + if (result.type === "rate_limit") { + // Rate limit - release semaphore, sleep, and retry without incrementing runNumber + await logDebug( + `Dispatcher rate limit for queue ${queue.name()} job ${id} (delayMs=${result.delayMs})`, + ); + await semaphore.release(leaseId); + await ctx.sleep(result.delayMs, "rate limit retry"); + continue; + } + + if (result.type === "error") { + // Call onError on the runner BEFORE releasing semaphore + // This ensures inFlight tracking stays consistent + await logDebug( + `Dispatcher runner error for queue ${queue.name()} job ${id}: ${result.error.message}`, + ); + await tryCatch( + runner.onError({ + job: jobData, + error: result.error, + }), + ); + await semaphore.release(leaseId); + + // Retry with backoff + await ctx.sleep(1000, "error retry"); + runNumber++; + continue; + } + + // Success - call onCompleted BEFORE releasing semaphore + // This ensures inFlight tracking stays consistent + await logDebug( + `Dispatcher completed queue ${queue.name()} job ${id}`, + ); + await tryCatch( + runner.onCompleted({ + job: jobData, + result: result.value, + }), + ); + await semaphore.release(leaseId); + break; + } + }, + }, + }); +} diff --git a/packages/plugins/queue-restate/src/env.ts b/packages/plugins/queue-restate/src/env.ts index 01175e86..41003460 100644 --- a/packages/plugins/queue-restate/src/env.ts +++ b/packages/plugins/queue-restate/src/env.ts @@ -1,5 +1,12 @@ import { z } from "zod"; +const stringBool = (defaultValue: string) => + z + .string() + .default(defaultValue) + .refine((s) => s === "true" || s === "false") + .transform((s) => s === "true"); + export const envConfig = z .object({ RESTATE_LISTEN_PORT: z.coerce.number().optional(), @@ -9,5 +16,9 @@ export const envConfig = z .default("http://localhost:8080"), RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"), RESTATE_PUB_KEY: z.string().optional(), + RESTATE_EXPOSE_CORE_SERVICES: stringBool("true"), + // Deployment mode configuration - allows running dispatchers and runners separately + RESTATE_ENABLE_DISPATCHERS: stringBool("true"), + RESTATE_ENABLE_RUNNERS: stringBool("true"), }) .parse(process.env); diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index fa636f87..f8761291 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -17,7 +17,7 @@ import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; +import { buildRestateServices } from "./service"; class RestateQueueWrapper<T> implements Queue<T> { constructor( @@ -27,6 +27,10 @@ class RestateQueueWrapper<T> implements Queue<T> { public readonly opts: QueueOptions, ) {} + ensureInit(): Promise<void> { + return Promise.resolve(); + } + name(): string { return this._name; } @@ -87,12 +91,13 @@ class RestateQueueWrapper<T> implements Queue<T> { class RestateRunnerWrapper<T> implements Runner<T> { constructor( - private readonly wf: restate.ServiceDefinition< + private readonly dispatcherDef: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise<void>; } >, + private readonly runnerDef: restate.ServiceDefinition<string, unknown>, ) {} async run(): Promise<void> { @@ -107,8 +112,12 @@ class RestateRunnerWrapper<T> implements Runner<T> { throw new Error("Method not implemented."); } - get def(): restate.WorkflowDefinition<string, unknown> { - return this.wf; + get dispatcherService(): restate.ServiceDefinition<string, unknown> { + return this.dispatcherDef; + } + + get runnerService(): restate.ServiceDefinition<string, unknown> { + return this.runnerDef; } } @@ -130,13 +139,24 @@ class RestateQueueClient implements QueueClient { } async start(): Promise<void> { + const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = []; + + for (const svc of this.services.values()) { + if (envConfig.RESTATE_ENABLE_DISPATCHERS) { + servicesToExpose.push(svc.dispatcherService); + } + if (envConfig.RESTATE_ENABLE_RUNNERS) { + servicesToExpose.push(svc.runnerService); + } + } + + if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) { + servicesToExpose.push(semaphore, idProvider); + } + const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - semaphore, - idProvider, - ], + services: servicesToExpose, identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, @@ -175,8 +195,10 @@ class RestateQueueClient implements QueueClient { if (wrapper) { throw new Error(`Queue ${name} already exists`); } + const services = buildRestateServices(queue, funcs, opts, queue.opts); const svc = new RestateRunnerWrapper<T>( - buildRestateService(queue, funcs, opts, queue.opts), + services.dispatcher, + services.runner, ); this.services.set(name, svc); return svc; diff --git a/packages/plugins/queue-restate/src/runner.ts b/packages/plugins/queue-restate/src/runner.ts new file mode 100644 index 00000000..2bda9f72 --- /dev/null +++ b/packages/plugins/queue-restate/src/runner.ts @@ -0,0 +1,164 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { RunnerFuncs, RunnerOptions } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import type { RunnerJobData, RunnerResult, SerializedError } from "./types"; + +function serializeError(error: Error): SerializedError { + return { + name: error.name, + message: error.message, + stack: error.stack, + }; +} + +export function runnerServiceName(queueName: string): string { + return `${queueName}-runner`; +} + +export function buildRunnerService<T, R>( + queueName: string, + funcs: RunnerFuncs<T, R>, + opts: RunnerOptions<T>, +) { + return restate.service({ + name: runnerServiceName(queueName), + options: { + ingressPrivate: true, + inactivityTimeout: { + seconds: opts.timeoutSecs * 2, + }, + // No retries at runner level - dispatcher handles retry logic + retryPolicy: { + maxAttempts: 1, + }, + journalRetention: { + days: 3, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + jobData: RunnerJobData<T>, + ): Promise<RunnerResult<R>> => { + // Validate payload if validator provided + let payload = jobData.data; + if (opts.validator) { + const res = opts.validator.safeParse(jobData.data); + if (!res.success) { + return { + type: "error", + error: { + name: "ValidationError", + message: res.error.message, + }, + }; + } + payload = res.data; + } + + const res = await tryCatch( + ctx + .run( + "main logic", + async () => { + const result = await tryCatch( + funcs.run({ + id: jobData.id, + data: payload, + priority: jobData.priority, + runNumber: jobData.runNumber, + abortSignal: AbortSignal.timeout( + jobData.timeoutSecs * 1000, + ), + }), + ); + if (result.error) { + if (result.error instanceof QueueRetryAfterError) { + return { + type: "rate_limit" as const, + delayMs: result.error.delayMs, + }; + } + throw result.error; + } + return { type: "success" as const, value: result.data }; + }, + { + maxRetryAttempts: 1, + }, + ) + .orTimeout({ + seconds: jobData.timeoutSecs * 1.1, + }), + ); + + if (res.error) { + return { + type: "error", + error: serializeError(res.error), + }; + } + + return res.data as RunnerResult<R>; + }, + + onCompleted: async ( + ctx: restate.Context, + data: { job: RunnerJobData<T>; result: R }, + ): Promise<void> => { + await ctx.run( + "onComplete", + async () => { + await funcs.onComplete?.( + { + id: data.job.id, + data: data.job.data, + priority: data.job.priority, + runNumber: data.job.runNumber, + abortSignal: AbortSignal.timeout(data.job.timeoutSecs * 1000), + }, + data.result, + ); + }, + { + maxRetryAttempts: 1, + }, + ); + }, + + onError: async ( + ctx: restate.Context, + data: { job: RunnerJobData<T>; error: SerializedError }, + ): Promise<void> => { + // Reconstruct the error + const reconstructedError = Object.assign( + new Error(data.error.message), + { + name: data.error.name, + stack: data.error.stack, + }, + ); + + await ctx.run( + "onError", + async () => { + await funcs.onError?.({ + id: data.job.id, + data: data.job.data, + priority: data.job.priority, + runNumber: data.job.runNumber, + numRetriesLeft: data.job.numRetriesLeft, + error: reconstructedError, + }); + }, + { + maxRetryAttempts: 1, + }, + ); + }, + }, + }); +} diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts index 694c77b3..72fa938a 100644 --- a/packages/plugins/queue-restate/src/semaphore.ts +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -7,17 +7,20 @@ interface QueueItem { awakeable: string; idempotencyKey?: string; priority: number; + leaseDurationMs: number; } interface LegacyQueueState { - items: QueueItem[]; itemsv2: Record<string, GroupState>; inFlight: number; + paused: boolean; + leases: Record<string, number>; } interface QueueState { groups: Record<string, GroupState>; - inFlight: number; + paused: boolean; + leases: Record<string, number>; } interface GroupState { @@ -29,65 +32,115 @@ interface GroupState { export const semaphore = object({ name: "Semaphore", handlers: { - acquire: async ( - ctx: ObjectContext<LegacyQueueState>, - req: { - awakeableId: string; - priority: number; - capacity: number; - groupId?: string; - idempotencyKey?: string; + acquire: restate.handlers.object.exclusive( + { + ingressPrivate: true, }, - ): Promise<boolean> => { - const state = await getState(ctx); - - if ( - req.idempotencyKey && - idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey) - ) { - return false; - } + async ( + ctx: ObjectContext<LegacyQueueState>, + req: { + awakeableId: string; + priority: number; + capacity: number; + leaseDurationMs: number; + groupId?: string; + idempotencyKey?: string; + }, + ): Promise<boolean> => { + const state = await getState(ctx); - req.groupId = req.groupId ?? "__ungrouped__"; + if ( + req.idempotencyKey && + idempotencyKeyAlreadyExists(state.groups, req.idempotencyKey) + ) { + return false; + } - if (state.groups[req.groupId] === undefined) { - state.groups[req.groupId] = { - id: req.groupId, - items: [], - lastServedTimestamp: Date.now(), - }; - } + req.groupId = req.groupId ?? "__ungrouped__"; - state.groups[req.groupId].items.push({ - awakeable: req.awakeableId, - priority: req.priority, - idempotencyKey: req.idempotencyKey, - }); + if (state.groups[req.groupId] === undefined) { + state.groups[req.groupId] = { + id: req.groupId, + items: [], + lastServedTimestamp: await ctx.date.now(), + }; + } - tick(ctx, state, req.capacity); + state.groups[req.groupId].items.push({ + awakeable: req.awakeableId, + priority: req.priority, + idempotencyKey: req.idempotencyKey, + leaseDurationMs: req.leaseDurationMs, + }); - setState(ctx, state); - return true; - }, - - release: async ( - ctx: ObjectContext<LegacyQueueState>, - capacity: number, - ): Promise<void> => { - const state = await getState(ctx); - state.inFlight--; - tick(ctx, state, capacity); - setState(ctx, state); - }, + await tick(ctx, state, req.capacity); + + setState(ctx, state); + return true; + }, + ), + + release: restate.handlers.object.exclusive( + { + ingressPrivate: true, + }, + async ( + ctx: ObjectContext<LegacyQueueState>, + req: { + leaseId: string; + capacity: number; + }, + ): Promise<void> => { + const state = await getState(ctx); + delete state.leases[req.leaseId]; + await tick(ctx, state, req.capacity); + setState(ctx, state); + }, + ), + pause: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + state.paused = true; + setState(ctx, state); + }, + ), + resume: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + state.paused = false; + await tick(ctx, state, 1); + setState(ctx, state); + }, + ), + resetInflight: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + state.leases = {}; + setState(ctx, state); + }, + ), + tick: restate.handlers.object.exclusive( + {}, + async (ctx: ObjectContext<LegacyQueueState>): Promise<void> => { + const state = await getState(ctx); + await tick(ctx, state, 1); + setState(ctx, state); + }, + ), }, options: { - ingressPrivate: true, journalRetention: 0, }, }); // Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(state: QueueState): { +function selectAndPopItem( + state: QueueState, + now: number, +): { item: QueueItem; groupId: string; } { @@ -122,21 +175,37 @@ function selectAndPopItem(state: QueueState): { } const [item] = state.groups[selected.groupId].items.splice(selected.index, 1); - state.groups[selected.groupId].lastServedTimestamp = Date.now(); + state.groups[selected.groupId].lastServedTimestamp = now; if (state.groups[selected.groupId].items.length === 0) { delete state.groups[selected.groupId]; } return { item, groupId: selected.groupId }; } -function tick( +function pruneExpiredLeases(state: QueueState, now: number) { + for (const [leaseId, expiry] of Object.entries(state.leases)) { + if (expiry <= now) { + delete state.leases[leaseId]; + } + } + return Object.keys(state.leases).length; +} + +async function tick( ctx: ObjectContext<LegacyQueueState>, state: QueueState, capacity: number, -) { - while (state.inFlight < capacity && Object.keys(state.groups).length > 0) { - const { item } = selectAndPopItem(state); - state.inFlight++; +): Promise<void> { + let activeLeases = pruneExpiredLeases(state, await ctx.date.now()); + while ( + !state.paused && + activeLeases < capacity && + Object.keys(state.groups).length > 0 + ) { + const now = await ctx.date.now(); + const { item } = selectAndPopItem(state, now); + state.leases[item.awakeable] = now + item.leaseDurationMs; + activeLeases++; ctx.resolveAwakeable(item.awakeable); } } @@ -145,19 +214,13 @@ async function getState( ctx: ObjectContext<LegacyQueueState>, ): Promise<QueueState> { const groups = (await ctx.get("itemsv2")) ?? {}; - const items = (await ctx.get("items")) ?? []; - - if (items.length > 0) { - groups["__legacy__"] = { - id: "__legacy__", - items, - lastServedTimestamp: 0, - }; - } + const paused = (await ctx.get("paused")) ?? false; + const leases = (await ctx.get("leases")) ?? {}; return { groups, - inFlight: (await ctx.get("inFlight")) ?? 0, + paused, + leases, }; } @@ -175,8 +238,8 @@ function idempotencyKeyAlreadyExists( function setState(ctx: ObjectContext<LegacyQueueState>, state: QueueState) { ctx.set("itemsv2", state.groups); - ctx.set("inFlight", state.inFlight); - ctx.clear("items"); + ctx.set("leases", state.leases); + ctx.set("paused", state.paused); } export class RestateSemaphore { @@ -184,6 +247,7 @@ export class RestateSemaphore { private readonly ctx: Context, private readonly id: string, private readonly capacity: number, + private readonly leaseDurationMs: number, ) {} async acquire(priority: number, groupId?: string, idempotencyKey?: string) { @@ -194,6 +258,7 @@ export class RestateSemaphore { awakeableId: awk.id, priority, capacity: this.capacity, + leaseDurationMs: this.leaseDurationMs, groupId, idempotencyKey, }); @@ -206,15 +271,15 @@ export class RestateSemaphore { await awk.promise; } catch (e) { if (e instanceof restate.CancelledError) { - await this.release(); - throw e; + await this.release(awk.id); } + throw e; } - return true; + return awk.id; } - async release() { + async release(leaseId: string) { await this.ctx .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) - .release(this.capacity); + .release({ leaseId, capacity: this.capacity }); } } diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts index 5ba7d1df..139e5738 100644 --- a/packages/plugins/queue-restate/src/service.ts +++ b/packages/plugins/queue-restate/src/service.ts @@ -1,154 +1,26 @@ -import * as restate from "@restatedev/restate-sdk"; - import type { Queue, QueueOptions, RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; -import { tryCatch } from "@karakeep/shared/tryCatch"; -import { genId } from "./idProvider"; -import { RestateSemaphore } from "./semaphore"; +import { buildDispatcherService } from "./dispatcher"; +import { buildRunnerService } from "./runner"; + +export interface RestateServicePair<T, R> { + dispatcher: ReturnType<typeof buildDispatcherService<T, R>>; + runner: ReturnType<typeof buildRunnerService<T, R>>; +} -export function buildRestateService<T, R>( +export function buildRestateServices<T, R>( queue: Queue<T>, funcs: RunnerFuncs<T, R>, opts: RunnerOptions<T>, queueOpts: QueueOptions, -) { - const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; - return restate.service({ - name: queue.name(), - options: { - inactivityTimeout: { - seconds: opts.timeoutSecs, - }, - retryPolicy: { - maxAttempts: NUM_RETRIES, - initialInterval: { - seconds: 5, - }, - maxInterval: { - minutes: 1, - }, - }, - }, - handlers: { - run: async ( - ctx: restate.Context, - data: { - payload: T; - queuedIdempotencyKey?: string; - priority: number; - groupId?: string; - }, - ) => { - const id = `${await genId(ctx)}`; - let payload = data.payload; - if (opts.validator) { - const res = opts.validator.safeParse(data.payload); - if (!res.success) { - throw new restate.TerminalError(res.error.message, { - errorCode: 400, - }); - } - payload = res.data; - } - - const priority = data.priority ?? 0; - - const semaphore = new RestateSemaphore( - ctx, - `queue:${queue.name()}`, - opts.concurrency, - ); - - let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { - const acquired = await semaphore.acquire( - priority, - data.groupId, - data.queuedIdempotencyKey, - ); - if (!acquired) { - return; - } - const res = await runWorkerLogic(ctx, funcs, { - id, - data: payload, - priority, - runNumber, - numRetriesLeft: NUM_RETRIES - runNumber, - abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), - }); - await semaphore.release(); - if (res.error) { - if (res.error instanceof restate.CancelledError) { - throw res.error; - } - lastError = res.error; - // TODO: add backoff - await ctx.sleep(1000); - } else { - break; - } - } - if (lastError) { - throw new restate.TerminalError(lastError.message, { - errorCode: 500, - cause: "cause" in lastError ? lastError.cause : undefined, - }); - } - }, - }, - }); -} - -async function runWorkerLogic<T, R>( - ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs<T, R>, - data: { - id: string; - data: T; - priority: number; - runNumber: number; - numRetriesLeft: number; - abortSignal: AbortSignal; - }, -) { - const res = await tryCatch( - ctx.run( - `main logic`, - async () => { - return await run(data); - }, - { - maxRetryAttempts: 1, - }, - ), - ); - if (res.error) { - await tryCatch( - ctx.run( - `onError`, - async () => - onError?.({ - ...data, - error: res.error, - }), - { - maxRetryAttempts: 1, - }, - ), - ); - return res; - } - - await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data, res.data), { - maxRetryAttempts: 1, - }), - ); - return res; +): RestateServicePair<T, R> { + return { + dispatcher: buildDispatcherService<T, R>(queue, opts, queueOpts), + runner: buildRunnerService<T, R>(queue.name(), funcs, opts), + }; } diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts index 2085d57b..3c4cf254 100644 --- a/packages/plugins/queue-restate/src/tests/queue.test.ts +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -10,6 +10,7 @@ import { } from "vitest"; import type { Queue, QueueClient } from "@karakeep/shared/queueing"; +import { QueueRetryAfterError } from "@karakeep/shared/queueing"; import { AdminClient } from "../admin"; import { RestateQueueProvider } from "../index"; @@ -49,7 +50,19 @@ type TestAction = | { type: "val"; val: number } | { type: "err"; err: string } | { type: "stall"; durSec: number } - | { type: "semaphore-acquire" }; + | { type: "semaphore-acquire" } + | { + type: "rate-limit"; + val: number; + delayMs: number; + attemptsBeforeSuccess: number; + } + | { + type: "timeout-then-succeed"; + val: number; + timeoutDurSec: number; + attemptsBeforeSuccess: number; + }; describe("Restate Queue Provider", () => { let queueClient: QueueClient; @@ -62,6 +75,8 @@ describe("Restate Queue Provider", () => { inFlight: 0, maxInFlight: 0, baton: new Baton(), + rateLimitAttempts: new Map<string, number>(), + timeoutAttempts: new Map<string, number>(), }; async function waitUntilQueueEmpty() { @@ -81,6 +96,8 @@ describe("Restate Queue Provider", () => { testState.inFlight = 0; testState.maxInFlight = 0; testState.baton = new Baton(); + testState.rateLimitAttempts = new Map<string, number>(); + testState.timeoutAttempts = new Map<string, number>(); }); afterEach(async () => { await waitUntilQueueEmpty(); @@ -133,6 +150,37 @@ describe("Restate Queue Provider", () => { break; case "semaphore-acquire": await testState.baton.acquire(); + break; + case "rate-limit": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.rateLimitAttempts.get(attemptKey) || 0; + testState.rateLimitAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + throw new QueueRetryAfterError( + `Rate limited (attempt ${currentAttempts + 1})`, + jobData.delayMs, + ); + } + return jobData.val; + } + case "timeout-then-succeed": { + const attemptKey = `${job.id}`; + const currentAttempts = + testState.timeoutAttempts.get(attemptKey) || 0; + testState.timeoutAttempts.set(attemptKey, currentAttempts + 1); + + if (currentAttempts < jobData.attemptsBeforeSuccess) { + // Stall longer than the timeout to trigger a timeout + await new Promise((resolve) => + setTimeout(resolve, jobData.timeoutDurSec * 1000), + ); + // This should not be reached if timeout works correctly + throw new Error("Should have timed out"); + } + return jobData.val; + } } }, onError: async (job) => { @@ -517,4 +565,139 @@ describe("Restate Queue Provider", () => { expect(testState.results).toEqual([102, 101, 100]); }, 60000); }); + + describe("QueueRetryAfterError handling", () => { + it("should retry after delay without counting against retry attempts", async () => { + const startTime = Date.now(); + + // This job will fail with QueueRetryAfterError twice before succeeding + await queue.enqueue({ + type: "rate-limit", + val: 42, + delayMs: 500, // 500ms delay + attemptsBeforeSuccess: 2, // Fail twice, succeed on third try + }); + + await waitUntilQueueEmpty(); + + const duration = Date.now() - startTime; + + // Should have succeeded + expect(testState.results).toEqual([42]); + + // Should have been called 3 times (2 rate limit failures + 1 success) + expect(testState.rateLimitAttempts.size).toBe(1); + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(3); + + // Should have waited at least 1 second total (2 x 500ms delays) + expect(duration).toBeGreaterThanOrEqual(1000); + + // onError should NOT have been called for rate limit retries + expect(testState.errors).toEqual([]); + }, 60000); + + it("should not exhaust retries when rate limited", async () => { + // This job will be rate limited many more times than the retry limit + // but should still eventually succeed + await queue.enqueue({ + type: "rate-limit", + val: 100, + delayMs: 100, // Short delay for faster test + attemptsBeforeSuccess: 10, // Fail 10 times (more than the 3 retry limit) + }); + + await waitUntilQueueEmpty(); + + // Should have succeeded despite being "retried" more than the limit + expect(testState.results).toEqual([100]); + + // Should have been called 11 times (10 rate limit failures + 1 success) + const attempts = Array.from(testState.rateLimitAttempts.values())[0]; + expect(attempts).toBe(11); + + // No errors should have been recorded + expect(testState.errors).toEqual([]); + }, 90000); + + it("should still respect retry limit for non-rate-limit errors", async () => { + // Enqueue a regular error job that should fail permanently + await queue.enqueue({ type: "err", err: "Regular error" }); + + await waitUntilQueueEmpty(); + + // Should have failed 4 times (initial + 3 retries) and not succeeded + expect(testState.errors).toEqual([ + "Regular error", + "Regular error", + "Regular error", + "Regular error", + ]); + expect(testState.results).toEqual([]); + }, 90000); + }); + + describe("Timeout handling", () => { + it("should retry timed out jobs and not waste semaphore slots", async () => { + // This test verifies that: + // 1. Jobs that timeout get retried correctly + // 2. Semaphore slots are freed when jobs timeout (via lease expiry) + // 3. Other jobs can still run while a job is being retried + + // Enqueue a job that will timeout on first attempt, succeed on second + // timeoutSecs is 2, so we stall for 5 seconds to ensure timeout + await queue.enqueue({ + type: "timeout-then-succeed", + val: 42, + timeoutDurSec: 5, + attemptsBeforeSuccess: 1, // Timeout once, then succeed + }); + + // Wait a bit for the first attempt to start + await new Promise((resolve) => setTimeout(resolve, 500)); + + // Enqueue more jobs to verify semaphore slots are eventually freed + // With concurrency=3, these should be able to run after the timeout + await queue.enqueue({ type: "val", val: 100 }); + await queue.enqueue({ type: "val", val: 101 }); + await queue.enqueue({ type: "val", val: 102 }); + + await waitUntilQueueEmpty(); + + // The timeout job should have succeeded after retry + expect(testState.results).toContain(42); + + // All other jobs should have completed + expect(testState.results).toContain(100); + expect(testState.results).toContain(101); + expect(testState.results).toContain(102); + + // The timeout job should have been attempted twice + const attempts = Array.from(testState.timeoutAttempts.values())[0]; + expect(attempts).toBe(2); + + // Concurrency should not have exceeded the limit + expect(testState.maxInFlight).toBeLessThanOrEqual(3); + }, 120000); + + it("should handle job that times out multiple times before succeeding", async () => { + // Enqueue a single job that times out twice before succeeding + // This tests that the retry mechanism works correctly for timeouts + await queue.enqueue({ + type: "timeout-then-succeed", + val: 99, + timeoutDurSec: 5, + attemptsBeforeSuccess: 2, // Timeout twice, then succeed + }); + + await waitUntilQueueEmpty(); + + // Job should eventually succeed + expect(testState.results).toEqual([99]); + + // Should have been attempted 3 times (2 timeouts + 1 success) + const attempts = Array.from(testState.timeoutAttempts.values())[0]; + expect(attempts).toBe(3); + }, 180000); + }); }); diff --git a/packages/plugins/queue-restate/src/types.ts b/packages/plugins/queue-restate/src/types.ts new file mode 100644 index 00000000..e406a4d3 --- /dev/null +++ b/packages/plugins/queue-restate/src/types.ts @@ -0,0 +1,78 @@ +import { z } from "zod"; + +/** + * Zod schema for serialized errors that cross the RPC boundary. + */ +export const zSerializedError = z.object({ + name: z.string(), + message: z.string(), + stack: z.string().optional(), +}); + +export type SerializedError = z.infer<typeof zSerializedError>; + +/** + * Zod schema for job data passed from dispatcher to runner. + */ +export function zRunnerJobData<T extends z.ZodTypeAny>(payloadSchema: T) { + return z.object({ + id: z.string(), + data: payloadSchema, + priority: z.number(), + runNumber: z.number(), + numRetriesLeft: z.number(), + timeoutSecs: z.number(), + }); +} + +export interface RunnerJobData<T> { + id: string; + data: T; + priority: number; + runNumber: number; + numRetriesLeft: number; + timeoutSecs: number; +} + +/** + * Zod schema for runner.run() response. + */ +export const zRunnerResult = z.discriminatedUnion("type", [ + z.object({ + type: z.literal("success"), + value: z.unknown(), + }), + z.object({ + type: z.literal("rate_limit"), + delayMs: z.number(), + }), + z.object({ + type: z.literal("error"), + error: zSerializedError, + }), +]); + +export type RunnerResult<R> = + | { type: "success"; value: R } + | { type: "rate_limit"; delayMs: number } + | { type: "error"; error: SerializedError }; + +/** + * Zod schema for runner.onCompleted() request. + */ +export function zOnCompletedRequest<T extends z.ZodTypeAny>(payloadSchema: T) { + return z.object({ + job: zRunnerJobData(payloadSchema), + result: z.unknown(), + }); +} + +/** + * Zod schema for runner.onError() request. + */ +export function zOnErrorRequest<T extends z.ZodTypeAny>(payloadSchema: T) { + return z.object({ + job: zRunnerJobData(payloadSchema), + error: zSerializedError, + }); +} |
