From f48e98e16ae588ee5004531bf9a5aed757ed3786 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 10 Jan 2026 15:31:30 +0000 Subject: fix: harden the restate implementation (#2370) * fix: parallelize queue enqueues in bookmark routes * fix: guard meilisearch client init with mutex * fix: fix propagation of last error in restate * fix: don't fail invocations when the job fails * fix: add a timeout around the worker runner logic * fix: add leases to handle dangling semaphores * feat: separate dispatchers and runners * add a test * fix silent promise failure --- packages/plugins/queue-restate/src/index.ts | 39 +++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 11 deletions(-) (limited to 'packages/plugins/queue-restate/src/index.ts') diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index 07082955..64572b03 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -17,7 +17,7 @@ import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; +import { buildRestateServices } from "./service"; class RestateQueueWrapper implements Queue { constructor( @@ -87,12 +87,13 @@ class RestateQueueWrapper implements Queue { class RestateRunnerWrapper implements Runner { constructor( - private readonly wf: restate.ServiceDefinition< + private readonly dispatcherDef: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise; } >, + private readonly runnerDef: restate.ServiceDefinition, ) {} async run(): Promise { @@ -107,8 +108,12 @@ class RestateRunnerWrapper implements Runner { throw new Error("Method not implemented."); } - get def(): restate.WorkflowDefinition { - return this.wf; + get dispatcherService(): restate.ServiceDefinition { + return this.dispatcherDef; + } + + get runnerService(): restate.ServiceDefinition { + return this.runnerDef; } } @@ -130,14 +135,24 @@ class RestateQueueClient implements QueueClient { } async start(): Promise { + const servicesToExpose: restate.ServiceDefinition[] = []; + + for (const svc of this.services.values()) { + if (envConfig.RESTATE_ENABLE_DISPATCHERS) { + servicesToExpose.push(svc.dispatcherService); + } + if (envConfig.RESTATE_ENABLE_RUNNERS) { + servicesToExpose.push(svc.runnerService); + } + } + + if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) { + servicesToExpose.push(semaphore, idProvider); + } + const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - ...(envConfig.RESTATE_EXPOSE_CORE_SERVICES - ? [semaphore, idProvider] - : []), - ], + services: servicesToExpose, identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, @@ -176,8 +191,10 @@ class RestateQueueClient implements QueueClient { if (wrapper) { throw new Error(`Queue ${name} already exists`); } + const services = buildRestateServices(queue, funcs, opts, queue.opts); const svc = new RestateRunnerWrapper( - buildRestateService(queue, funcs, opts, queue.opts), + services.dispatcher, + services.runner, ); this.services.set(name, svc); return svc; -- cgit v1.2.3-70-g09d2