diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-08 14:50:00 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-08 14:50:00 +0000 |
| commit | 99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch) | |
| tree | 73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins/queue-restate/src/index.ts | |
| parent | 737b03172c2e063ba311c23d6552418bd2ab1955 (diff) | |
| download | karakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst | |
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory
- Create new `packages/plugins` directory with consolidated package.json
- Move queue-liteque, queue-restate, and search-meilisearch to subdirectories
- Update imports in packages/shared-server/src/plugins.ts
- Remove individual plugin package directories
- Update shared-server dependency to use @karakeep/plugins
This reduces overhead of maintaining multiple separate packages for plugins.
* refactor: consolidate plugin config files to root level
- Move .oxlintrc.json to packages/plugins root
- Move vitest.config.ts to packages/plugins root
- Update vitest config paths to work from root
- Remove individual config files from plugin subdirectories
This reduces configuration duplication across plugin subdirectories.
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins/queue-restate/src/index.ts')
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 201 |
1 files changed, 201 insertions, 0 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts new file mode 100644 index 00000000..bedc26af --- /dev/null +++ b/packages/plugins/queue-restate/src/index.ts @@ -0,0 +1,201 @@ +import * as restate from "@restatedev/restate-sdk"; +import * as restateClient from "@restatedev/restate-sdk-clients"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; + +import { AdminClient } from "./admin"; +import { envConfig } from "./env"; +import { idProvider } from "./idProvider"; +import { semaphore } from "./semaphore"; +import { buildRestateService } from "./service"; + +class RestateQueueWrapper<T> implements Queue<T> { + constructor( + private readonly _name: string, + private readonly client: restateClient.Ingress, + private readonly adminClient: AdminClient, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise<string | undefined> { + interface MyService { + run: ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => Promise<void>; + } + const cl = this.client.serviceSendClient<MyService>({ name: this.name() }); + const res = await cl.run( + { + payload, + priority: options?.priority ?? 0, + }, + restateClient.rpc.sendOpts({ + delay: options?.delayMs + ? { + milliseconds: options.delayMs, + } + : undefined, + idempotencyKey: options?.idempotencyKey, + }), + ); + return res.invocationId; + } + + async stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }> { + const res = await this.adminClient.getStats(this.name()); + return { + pending: res.pending + res.ready, + pending_retry: res["backing-off"] + res.paused + res.suspended, + running: res.running, + failed: 0, + }; + } + + async cancelAllNonRunning(): Promise<number> { + throw new Error("Method not implemented."); + } +} + +class RestateRunnerWrapper<T> implements Runner<T> { + constructor( + private readonly wf: restate.ServiceDefinition< + string, + { + run: (ctx: restate.Context, data: T) => Promise<void>; + } + >, + ) {} + + async run(): Promise<void> { + // No-op for restate + } + + async stop(): Promise<void> { + // No-op for restate + } + + async runUntilEmpty(): Promise<void> { + throw new Error("Method not implemented."); + } + + get def(): restate.WorkflowDefinition<string, unknown> { + return this.wf; + } +} + +class RestateQueueClient implements QueueClient { + private client: restateClient.Ingress; + private adminClient: AdminClient; + private queues = new Map<string, RestateQueueWrapper<unknown>>(); + private services = new Map<string, RestateRunnerWrapper<unknown>>(); + + constructor() { + this.client = restateClient.connect({ + url: envConfig.RESTATE_INGRESS_ADDR, + }); + this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); + } + + async prepare(): Promise<void> { + // No-op for restate + } + + async start(): Promise<void> { + const port = await restate.serve({ + port: envConfig.RESTATE_LISTEN_PORT ?? 0, + services: [ + ...[...this.services.values()].map((svc) => svc.def), + semaphore, + idProvider, + ], + identityKeys: envConfig.RESTATE_PUB_KEY + ? [envConfig.RESTATE_PUB_KEY] + : undefined, + logger: (meta, msg) => { + if (meta.context) { + // No need to log invocation logs + } else { + logger.log(meta.level, `[restate] ${msg}`); + } + }, + }); + logger.info(`Restate listening on port ${port}`); + } + + createQueue<T>(name: string, opts: QueueOptions): Queue<T> { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const wrapper = new RestateQueueWrapper<T>( + name, + this.client, + this.adminClient, + opts, + ); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + ): Runner<T> { + const name = queue.name(); + let wrapper = this.services.get(name); + if (wrapper) { + throw new Error(`Queue ${name} already exists`); + } + const svc = new RestateRunnerWrapper<T>( + buildRestateService(queue, funcs, opts, queue.opts), + ); + this.services.set(name, svc); + return svc; + } + + async shutdown(): Promise<void> { + // No-op for sqlite + } +} + +export class RestateQueueProvider implements PluginProvider<QueueClient> { + private client: QueueClient | null = null; + + static isConfigured(): boolean { + return envConfig.RESTATE_LISTEN_PORT !== undefined; + } + + async getClient(): Promise<QueueClient | null> { + if (!this.client) { + const client = new RestateQueueClient(); + this.client = client; + } + return this.client; + } +} |
