aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-restate/src/index.ts
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins-queue-restate/src/index.ts')
-rw-r--r--packages/plugins-queue-restate/src/index.ts201
1 files changed, 0 insertions, 201 deletions
diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts
deleted file mode 100644
index bedc26af..00000000
--- a/packages/plugins-queue-restate/src/index.ts
+++ /dev/null
@@ -1,201 +0,0 @@
-import * as restate from "@restatedev/restate-sdk";
-import * as restateClient from "@restatedev/restate-sdk-clients";
-
-import type { PluginProvider } from "@karakeep/shared/plugins";
-import type {
- EnqueueOptions,
- Queue,
- QueueClient,
- QueueOptions,
- Runner,
- RunnerFuncs,
- RunnerOptions,
-} from "@karakeep/shared/queueing";
-import logger from "@karakeep/shared/logger";
-
-import { AdminClient } from "./admin";
-import { envConfig } from "./env";
-import { idProvider } from "./idProvider";
-import { semaphore } from "./semaphore";
-import { buildRestateService } from "./service";
-
-class RestateQueueWrapper<T> implements Queue<T> {
- constructor(
- private readonly _name: string,
- private readonly client: restateClient.Ingress,
- private readonly adminClient: AdminClient,
- public readonly opts: QueueOptions,
- ) {}
-
- name(): string {
- return this._name;
- }
-
- async enqueue(
- payload: T,
- options?: EnqueueOptions,
- ): Promise<string | undefined> {
- interface MyService {
- run: (
- ctx: restate.Context,
- data: {
- payload: T;
- priority: number;
- },
- ) => Promise<void>;
- }
- const cl = this.client.serviceSendClient<MyService>({ name: this.name() });
- const res = await cl.run(
- {
- payload,
- priority: options?.priority ?? 0,
- },
- restateClient.rpc.sendOpts({
- delay: options?.delayMs
- ? {
- milliseconds: options.delayMs,
- }
- : undefined,
- idempotencyKey: options?.idempotencyKey,
- }),
- );
- return res.invocationId;
- }
-
- async stats(): Promise<{
- pending: number;
- pending_retry: number;
- running: number;
- failed: number;
- }> {
- const res = await this.adminClient.getStats(this.name());
- return {
- pending: res.pending + res.ready,
- pending_retry: res["backing-off"] + res.paused + res.suspended,
- running: res.running,
- failed: 0,
- };
- }
-
- async cancelAllNonRunning(): Promise<number> {
- throw new Error("Method not implemented.");
- }
-}
-
-class RestateRunnerWrapper<T> implements Runner<T> {
- constructor(
- private readonly wf: restate.ServiceDefinition<
- string,
- {
- run: (ctx: restate.Context, data: T) => Promise<void>;
- }
- >,
- ) {}
-
- async run(): Promise<void> {
- // No-op for restate
- }
-
- async stop(): Promise<void> {
- // No-op for restate
- }
-
- async runUntilEmpty(): Promise<void> {
- throw new Error("Method not implemented.");
- }
-
- get def(): restate.WorkflowDefinition<string, unknown> {
- return this.wf;
- }
-}
-
-class RestateQueueClient implements QueueClient {
- private client: restateClient.Ingress;
- private adminClient: AdminClient;
- private queues = new Map<string, RestateQueueWrapper<unknown>>();
- private services = new Map<string, RestateRunnerWrapper<unknown>>();
-
- constructor() {
- this.client = restateClient.connect({
- url: envConfig.RESTATE_INGRESS_ADDR,
- });
- this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR);
- }
-
- async prepare(): Promise<void> {
- // No-op for restate
- }
-
- async start(): Promise<void> {
- const port = await restate.serve({
- port: envConfig.RESTATE_LISTEN_PORT ?? 0,
- services: [
- ...[...this.services.values()].map((svc) => svc.def),
- semaphore,
- idProvider,
- ],
- identityKeys: envConfig.RESTATE_PUB_KEY
- ? [envConfig.RESTATE_PUB_KEY]
- : undefined,
- logger: (meta, msg) => {
- if (meta.context) {
- // No need to log invocation logs
- } else {
- logger.log(meta.level, `[restate] ${msg}`);
- }
- },
- });
- logger.info(`Restate listening on port ${port}`);
- }
-
- createQueue<T>(name: string, opts: QueueOptions): Queue<T> {
- if (this.queues.has(name)) {
- throw new Error(`Queue ${name} already exists`);
- }
- const wrapper = new RestateQueueWrapper<T>(
- name,
- this.client,
- this.adminClient,
- opts,
- );
- this.queues.set(name, wrapper);
- return wrapper;
- }
-
- createRunner<T>(
- queue: Queue<T>,
- funcs: RunnerFuncs<T>,
- opts: RunnerOptions<T>,
- ): Runner<T> {
- const name = queue.name();
- let wrapper = this.services.get(name);
- if (wrapper) {
- throw new Error(`Queue ${name} already exists`);
- }
- const svc = new RestateRunnerWrapper<T>(
- buildRestateService(queue, funcs, opts, queue.opts),
- );
- this.services.set(name, svc);
- return svc;
- }
-
- async shutdown(): Promise<void> {
- // No-op for sqlite
- }
-}
-
-export class RestateQueueProvider implements PluginProvider<QueueClient> {
- private client: QueueClient | null = null;
-
- static isConfigured(): boolean {
- return envConfig.RESTATE_LISTEN_PORT !== undefined;
- }
-
- async getClient(): Promise<QueueClient | null> {
- if (!this.client) {
- const client = new RestateQueueClient();
- this.client = client;
- }
- return this.client;
- }
-}