From 99413db0e79a156a1b87eacd3c6a7b83e9df946e Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 8 Nov 2025 14:50:00 +0000 Subject: refactor: consolidate multiple karakeep plugins into one package (#2101) * refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude --- packages/plugins/queue-restate/src/index.ts | 201 ++++++++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 packages/plugins/queue-restate/src/index.ts (limited to 'packages/plugins/queue-restate/src/index.ts') diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts new file mode 100644 index 00000000..bedc26af --- /dev/null +++ b/packages/plugins/queue-restate/src/index.ts @@ -0,0 +1,201 @@ +import * as restate from "@restatedev/restate-sdk"; +import * as restateClient from "@restatedev/restate-sdk-clients"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; + +import { AdminClient } from "./admin"; +import { envConfig } from "./env"; +import { idProvider } from "./idProvider"; +import { semaphore } from "./semaphore"; +import { buildRestateService } from "./service"; + +class RestateQueueWrapper implements Queue { + constructor( + private readonly _name: string, + private readonly client: restateClient.Ingress, + private readonly adminClient: AdminClient, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise { + interface MyService { + run: ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => Promise; + } + const cl = this.client.serviceSendClient({ name: this.name() }); + const res = await cl.run( + { + payload, + priority: options?.priority ?? 0, + }, + restateClient.rpc.sendOpts({ + delay: options?.delayMs + ? { + milliseconds: options.delayMs, + } + : undefined, + idempotencyKey: options?.idempotencyKey, + }), + ); + return res.invocationId; + } + + async stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }> { + const res = await this.adminClient.getStats(this.name()); + return { + pending: res.pending + res.ready, + pending_retry: res["backing-off"] + res.paused + res.suspended, + running: res.running, + failed: 0, + }; + } + + async cancelAllNonRunning(): Promise { + throw new Error("Method not implemented."); + } +} + +class RestateRunnerWrapper implements Runner { + constructor( + private readonly wf: restate.ServiceDefinition< + string, + { + run: (ctx: restate.Context, data: T) => Promise; + } + >, + ) {} + + async run(): Promise { + // No-op for restate + } + + async stop(): Promise { + // No-op for restate + } + + async runUntilEmpty(): Promise { + throw new Error("Method not implemented."); + } + + get def(): restate.WorkflowDefinition { + return this.wf; + } +} + +class RestateQueueClient implements QueueClient { + private client: restateClient.Ingress; + private adminClient: AdminClient; + private queues = new Map>(); + private services = new Map>(); + + constructor() { + this.client = restateClient.connect({ + url: envConfig.RESTATE_INGRESS_ADDR, + }); + this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); + } + + async prepare(): Promise { + // No-op for restate + } + + async start(): Promise { + const port = await restate.serve({ + port: envConfig.RESTATE_LISTEN_PORT ?? 0, + services: [ + ...[...this.services.values()].map((svc) => svc.def), + semaphore, + idProvider, + ], + identityKeys: envConfig.RESTATE_PUB_KEY + ? [envConfig.RESTATE_PUB_KEY] + : undefined, + logger: (meta, msg) => { + if (meta.context) { + // No need to log invocation logs + } else { + logger.log(meta.level, `[restate] ${msg}`); + } + }, + }); + logger.info(`Restate listening on port ${port}`); + } + + createQueue(name: string, opts: QueueOptions): Queue { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const wrapper = new RestateQueueWrapper( + name, + this.client, + this.adminClient, + opts, + ); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner { + const name = queue.name(); + let wrapper = this.services.get(name); + if (wrapper) { + throw new Error(`Queue ${name} already exists`); + } + const svc = new RestateRunnerWrapper( + buildRestateService(queue, funcs, opts, queue.opts), + ); + this.services.set(name, svc); + return svc; + } + + async shutdown(): Promise { + // No-op for sqlite + } +} + +export class RestateQueueProvider implements PluginProvider { + private client: QueueClient | null = null; + + static isConfigured(): boolean { + return envConfig.RESTATE_LISTEN_PORT !== undefined; + } + + async getClient(): Promise { + if (!this.client) { + const client = new RestateQueueClient(); + this.client = client; + } + return this.client; + } +} -- cgit v1.2.3-70-g09d2