diff options
Diffstat (limited to '')
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 42 |
1 files changed, 32 insertions, 10 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index fa636f87..f8761291 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -17,7 +17,7 @@ import { AdminClient } from "./admin"; import { envConfig } from "./env"; import { idProvider } from "./idProvider"; import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; +import { buildRestateServices } from "./service"; class RestateQueueWrapper<T> implements Queue<T> { constructor( @@ -27,6 +27,10 @@ class RestateQueueWrapper<T> implements Queue<T> { public readonly opts: QueueOptions, ) {} + ensureInit(): Promise<void> { + return Promise.resolve(); + } + name(): string { return this._name; } @@ -87,12 +91,13 @@ class RestateQueueWrapper<T> implements Queue<T> { class RestateRunnerWrapper<T> implements Runner<T> { constructor( - private readonly wf: restate.ServiceDefinition< + private readonly dispatcherDef: restate.ServiceDefinition< string, { run: (ctx: restate.Context, data: T) => Promise<void>; } >, + private readonly runnerDef: restate.ServiceDefinition<string, unknown>, ) {} async run(): Promise<void> { @@ -107,8 +112,12 @@ class RestateRunnerWrapper<T> implements Runner<T> { throw new Error("Method not implemented."); } - get def(): restate.WorkflowDefinition<string, unknown> { - return this.wf; + get dispatcherService(): restate.ServiceDefinition<string, unknown> { + return this.dispatcherDef; + } + + get runnerService(): restate.ServiceDefinition<string, unknown> { + return this.runnerDef; } } @@ -130,13 +139,24 @@ class RestateQueueClient implements QueueClient { } async start(): Promise<void> { + const servicesToExpose: restate.ServiceDefinition<string, unknown>[] = []; + + for (const svc of this.services.values()) { + if (envConfig.RESTATE_ENABLE_DISPATCHERS) { + servicesToExpose.push(svc.dispatcherService); + } + if (envConfig.RESTATE_ENABLE_RUNNERS) { + servicesToExpose.push(svc.runnerService); + } + } + + if (envConfig.RESTATE_EXPOSE_CORE_SERVICES) { + servicesToExpose.push(semaphore, idProvider); + } + const port = await restate.serve({ port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - semaphore, - idProvider, - ], + services: servicesToExpose, identityKeys: envConfig.RESTATE_PUB_KEY ? [envConfig.RESTATE_PUB_KEY] : undefined, @@ -175,8 +195,10 @@ class RestateQueueClient implements QueueClient { if (wrapper) { throw new Error(`Queue ${name} already exists`); } + const services = buildRestateServices(queue, funcs, opts, queue.opts); const svc = new RestateRunnerWrapper<T>( - buildRestateService(queue, funcs, opts, queue.opts), + services.dispatcher, + services.runner, ); this.services.set(name, svc); return svc; |
