diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-01-10 15:31:30 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-01-10 15:31:30 +0000 |
| commit | f48e98e16ae588ee5004531bf9a5aed757ed3786 (patch) | |
| tree | fc3b9ca6f0512fef90124e45cbe59dd4c305d5e7 /packages/plugins/queue-restate/src/index.ts | |
| parent | aace8864d7eab5c858a92064b0ac59c122377830 (diff) | |
| download | karakeep-f48e98e16ae588ee5004531bf9a5aed757ed3786.tar.zst | |
fix: harden the restate implementation (#2370)
* fix: parallelize queue enqueues in bookmark routes
* fix: guard meilisearch client init with mutex
* fix: fix propagation of last error in restate
* fix: don't fail invocations when the job fails
* fix: add a timeout around the worker runner logic
* fix: add leases to handle dangling semaphores
* feat: separate dispatchers and runners
* add a test
* fix silent promise failure
Diffstat (limited to 'packages/plugins/queue-restate/src/index.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 39 |
1 files changed, 28 insertions, 11 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index 07082955..64572b03 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -17,7 +17,7 @@ import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; +import { buildRestateServices } from "./service"; class RestateQueueWrapper<T> implements Queue<T> { constructor( @@ -87,12 +87,13 @@ class RestateQueueWrapper<T> implements Queue<T> { class RestateRunnerWrapper<T> implements Runner<T> { constructor( - private readonly wf: restate.ServiceDefinition< + private readonly dispatcherDef: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise<void>; } >, + private readonly runnerDef: restate.ServiceDefinition<string, unknown>, ) {} async run(): Promise<void> { @@ -107,8 +108,12 @@ class RestateRunnerWrapper<T> implements Runner<T> { throw new Error("Method not implemented."); } - get def(): restate.WorkflowDefinition<string, unknown> { - return this.wf; + get dispatcherService(): restate.ServiceDefinition<string, unknown> { + return this.dispatcherDef; + } + + get runnerService(): restate.ServiceDefinition<string, unknown> { + return this.runnerDef; } } @@ -130,14 +135,24 @@ class RestateQueueClient implements QueueClient { } async start(): Promise<void> { + const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = []; + + for (const svc of this.services.values()) { + if (envConfig.RESTATE_ENABLE_DISPATCHERS) { + servicesToExpose.push(svc.dispatcherService); + } + if (envConfig.RESTATE_ENABLE_RUNNERS) { + servicesToExpose.push(svc.runnerService); + } + } + + if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) { + servicesToExpose.push(semaphore, idProvider); + } + const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - ...(envConfig.RESTATE_EXPOSE_CORE_SERVICES - ? [semaphore, idProvider] - : []), - ], + services: servicesToExpose, identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, @@ -176,8 +191,10 @@ class RestateQueueClient implements QueueClient { if (wrapper) { throw new Error(`Queue ${name} already exists`); } + const services = buildRestateServices(queue, funcs, opts, queue.opts); const svc = new RestateRunnerWrapper<T>( - buildRestateService(queue, funcs, opts, queue.opts), + services.dispatcher, + services.runner, ); this.services.set(name, svc); return svc; |
