diff options
Diffstat (limited to 'packages/plugins/queue-restate/src/index.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 39 |
1 files changed, 28 insertions, 11 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index 07082955..64572b03 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -17,7 +17,7 @@ import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; +import { buildRestateServices } from "./service"; class RestateQueueWrapper<T> implements Queue<T> { constructor( @@ -87,12 +87,13 @@ class RestateQueueWrapper<T> implements Queue<T> { class RestateRunnerWrapper<T> implements Runner<T> { constructor( - private readonly wf: restate.ServiceDefinition< + private readonly dispatcherDef: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise<void>; } >, + private readonly runnerDef: restate.ServiceDefinition<string, unknown>, ) {} async run(): Promise<void> { @@ -107,8 +108,12 @@ class RestateRunnerWrapper<T> implements Runner<T> { throw new Error("Method not implemented."); } - get def(): restate.WorkflowDefinition<string, unknown> { - return this.wf; + get dispatcherService(): restate.ServiceDefinition<string, unknown> { + return this.dispatcherDef; + } + + get runnerService(): restate.ServiceDefinition<string, unknown> { + return this.runnerDef; } } @@ -130,14 +135,24 @@ class RestateQueueClient implements QueueClient { } async start(): Promise<void> { + const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = []; + + for (const svc of this.services.values()) { + if (envConfig.RESTATE_ENABLE_DISPATCHERS) { + servicesToExpose.push(svc.dispatcherService); + } + if (envConfig.RESTATE_ENABLE_RUNNERS) { + servicesToExpose.push(svc.runnerService); + } + } + + if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) { + servicesToExpose.push(semaphore, idProvider); + } + const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - ...(envConfig.RESTATE_EXPOSE_CORE_SERVICES - ? [semaphore, idProvider] - : []), - ], + services: servicesToExpose, identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, @@ -176,8 +191,10 @@ class RestateQueueClient implements QueueClient { if (wrapper) { throw new Error(`Queue ${name} already exists`); } + const services = buildRestateServices(queue, funcs, opts, queue.opts); const svc = new RestateRunnerWrapper<T>( - buildRestateService(queue, funcs, opts, queue.opts), + services.dispatcher, + services.runner, ); this.services.set(name, svc); return svc; |
