diff options
Diffstat (limited to 'packages/plugins-queue-restate/src/index.ts')
| -rw-r--r-- | packages/plugins-queue-restate/src/index.ts | 201 |
1 files changed, 0 insertions, 201 deletions
diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts deleted file mode 100644 index bedc26af..00000000 --- a/packages/plugins-queue-restate/src/index.ts +++ /dev/null @@ -1,201 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; -import * as restateClient from "@restatedev/restate-sdk-clients"; - -import type { PluginProvider } from "@karakeep/shared/plugins"; -import type { - EnqueueOptions, - Queue, - QueueClient, - QueueOptions, - Runner, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import logger from "@karakeep/shared/logger"; - -import { AdminClient } from "./admin"; -import { envConfig } from "./env"; -import { idProvider } from "./idProvider"; -import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; - -class RestateQueueWrapper<T> implements Queue<T> { - constructor( - private readonly _name: string, - private readonly client: restateClient.Ingress, - private readonly adminClient: AdminClient, - public readonly opts: QueueOptions, - ) {} - - name(): string { - return this._name; - } - - async enqueue( - payload: T, - options?: EnqueueOptions, - ): Promise<string | undefined> { - interface MyService { - run: ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => Promise<void>; - } - const cl = this.client.serviceSendClient<MyService>({ name: this.name() }); - const res = await cl.run( - { - payload, - priority: options?.priority ?? 0, - }, - restateClient.rpc.sendOpts({ - delay: options?.delayMs - ? { - milliseconds: options.delayMs, - } - : undefined, - idempotencyKey: options?.idempotencyKey, - }), - ); - return res.invocationId; - } - - async stats(): Promise<{ - pending: number; - pending_retry: number; - running: number; - failed: number; - }> { - const res = await this.adminClient.getStats(this.name()); - return { - pending: res.pending + res.ready, - pending_retry: res["backing-off"] + res.paused + res.suspended, - running: res.running, - failed: 0, - }; - } - - async cancelAllNonRunning(): Promise<number> { - throw new Error("Method not implemented."); - } -} - -class RestateRunnerWrapper<T> implements Runner<T> { - constructor( - private readonly wf: restate.ServiceDefinition< - string, - { - run: (ctx: restate.Context, data: T) => Promise<void>; - } - >, - ) {} - - async run(): Promise<void> { - // No-op for restate - } - - async stop(): Promise<void> { - // No-op for restate - } - - async runUntilEmpty(): Promise<void> { - throw new Error("Method not implemented."); - } - - get def(): restate.WorkflowDefinition<string, unknown> { - return this.wf; - } -} - -class RestateQueueClient implements QueueClient { - private client: restateClient.Ingress; - private adminClient: AdminClient; - private queues = new Map<string, RestateQueueWrapper<unknown>>(); - private services = new Map<string, RestateRunnerWrapper<unknown>>(); - - constructor() { - this.client = restateClient.connect({ - url: envConfig.RESTATE_INGRESS_ADDR, - }); - this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); - } - - async prepare(): Promise<void> { - // No-op for restate - } - - async start(): Promise<void> { - const port = await restate.serve({ - port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - semaphore, - idProvider, - ], - identityKeys: envConfig.RESTATE_PUB_KEY - ? [envConfig.RESTATE_PUB_KEY] - : undefined, - logger: (meta, msg) => { - if (meta.context) { - // No need to log invocation logs - } else { - logger.log(meta.level, `[restate] ${msg}`); - } - }, - }); - logger.info(`Restate listening on port ${port}`); - } - - createQueue<T>(name: string, opts: QueueOptions): Queue<T> { - if (this.queues.has(name)) { - throw new Error(`Queue ${name} already exists`); - } - const wrapper = new RestateQueueWrapper<T>( - name, - this.client, - this.adminClient, - opts, - ); - this.queues.set(name, wrapper); - return wrapper; - } - - createRunner<T>( - queue: Queue<T>, - funcs: RunnerFuncs<T>, - opts: RunnerOptions<T>, - ): Runner<T> { - const name = queue.name(); - let wrapper = this.services.get(name); - if (wrapper) { - throw new Error(`Queue ${name} already exists`); - } - const svc = new RestateRunnerWrapper<T>( - buildRestateService(queue, funcs, opts, queue.opts), - ); - this.services.set(name, svc); - return svc; - } - - async shutdown(): Promise<void> { - // No-op for sqlite - } -} - -export class RestateQueueProvider implements PluginProvider<QueueClient> { - private client: QueueClient | null = null; - - static isConfigured(): boolean { - return envConfig.RESTATE_LISTEN_PORT !== undefined; - } - - async getClient(): Promise<QueueClient | null> { - if (!this.client) { - const client = new RestateQueueClient(); - this.client = client; - } - return this.client; - } -} |
