aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins/queue-restate/src/index.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-08 14:50:00 +0000
committerGitHub <noreply@github.com>2025-11-08 14:50:00 +0000
commit99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch)
tree73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins/queue-restate/src/index.ts
parent737b03172c2e063ba311c23d6552418bd2ab1955 (diff)
downloadkarakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins/queue-restate/src/index.ts')
-rw-r--r--packages/plugins/queue-restate/src/index.ts201
1 files changed, 201 insertions, 0 deletions
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
new file mode 100644
index 00000000..bedc26af
--- /dev/null
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -0,0 +1,201 @@
+import * as restate from "@restatedev/restate-sdk";
+import * as restateClient from "@restatedev/restate-sdk-clients";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import logger from "@karakeep/shared/logger";
+
+import { AdminClient } from "./admin";
+import { envConfig } from "./env";
+import { idProvider } from "./idProvider";
+import { semaphore } from "./semaphore";
+import { buildRestateService } from "./service";
+
+class RestateQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly client: restateClient.Ingress,
+ private readonly adminClient: AdminClient,
+ public readonly opts: QueueOptions,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ interface MyService {
+ run: (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => Promise<void>;
+ }
+ const cl = this.client.serviceSendClient<MyService>({ name: this.name() });
+ const res = await cl.run(
+ {
+ payload,
+ priority: options?.priority ?? 0,
+ },
+ restateClient.rpc.sendOpts({
+ delay: options?.delayMs
+ ? {
+ milliseconds: options.delayMs,
+ }
+ : undefined,
+ idempotencyKey: options?.idempotencyKey,
+ }),
+ );
+ return res.invocationId;
+ }
+
+ async stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }> {
+ const res = await this.adminClient.getStats(this.name());
+ return {
+ pending: res.pending + res.ready,
+ pending_retry: res["backing-off"] + res.paused + res.suspended,
+ running: res.running,
+ failed: 0,
+ };
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ throw new Error("Method not implemented.");
+ }
+}
+
+class RestateRunnerWrapper<T> implements Runner<T> {
+ constructor(
+ private readonly wf: restate.ServiceDefinition<
+ string,
+ {
+ run: (ctx: restate.Context, data: T) => Promise<void>;
+ }
+ >,
+ ) {}
+
+ async run(): Promise<void> {
+ // No-op for restate
+ }
+
+ async stop(): Promise<void> {
+ // No-op for restate
+ }
+
+ async runUntilEmpty(): Promise<void> {
+ throw new Error("Method not implemented.");
+ }
+
+ get def(): restate.WorkflowDefinition<string, unknown> {
+ return this.wf;
+ }
+}
+
+class RestateQueueClient implements QueueClient {
+ private client: restateClient.Ingress;
+ private adminClient: AdminClient;
+ private queues = new Map<string, RestateQueueWrapper<unknown>>();
+ private services = new Map<string, RestateRunnerWrapper<unknown>>();
+
+ constructor() {
+ this.client = restateClient.connect({
+ url: envConfig.RESTATE_INGRESS_ADDR,
+ });
+ this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR);
+ }
+
+ async prepare(): Promise<void> {
+ // No-op for restate
+ }
+
+ async start(): Promise<void> {
+ const port = await restate.serve({
+ port: envConfig.RESTATE_LISTEN_PORT ?? 0,
+ services: [
+ ...[...this.services.values()].map((svc) => svc.def),
+ semaphore,
+ idProvider,
+ ],
+ identityKeys: envConfig.RESTATE_PUB_KEY
+ ? [envConfig.RESTATE_PUB_KEY]
+ : undefined,
+ logger: (meta, msg) => {
+ if (meta.context) {
+ // No need to log invocation logs
+ } else {
+ logger.log(meta.level, `[restate] ${msg}`);
+ }
+ },
+ });
+ logger.info(`Restate listening on port ${port}`);
+ }
+
+ createQueue<T>(name: string, opts: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const wrapper = new RestateQueueWrapper<T>(
+ name,
+ this.client,
+ this.adminClient,
+ opts,
+ );
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.services.get(name);
+ if (wrapper) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const svc = new RestateRunnerWrapper<T>(
+ buildRestateService(queue, funcs, opts, queue.opts),
+ );
+ this.services.set(name, svc);
+ return svc;
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class RestateQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ static isConfigured(): boolean {
+ return envConfig.RESTATE_LISTEN_PORT !== undefined;
+ }
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new RestateQueueClient();
+ this.client = client;
+ }
+ return this.client;
+ }
+}