aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins
diff options
context:
space:
mode:
Diffstat (limited to 'packages/plugins')
-rw-r--r--packages/plugins/.oxlintrc.json19
-rw-r--r--packages/plugins/package.json34
-rw-r--r--packages/plugins/queue-liteque/index.ts10
-rw-r--r--packages/plugins/queue-liteque/src/index.ts137
-rw-r--r--packages/plugins/queue-restate/index.ts12
-rw-r--r--packages/plugins/queue-restate/src/admin.ts75
-rw-r--r--packages/plugins/queue-restate/src/env.ts13
-rw-r--r--packages/plugins/queue-restate/src/idProvider.ts21
-rw-r--r--packages/plugins/queue-restate/src/index.ts201
-rw-r--r--packages/plugins/queue-restate/src/semaphore.ts109
-rw-r--r--packages/plugins/queue-restate/src/service.ts133
-rw-r--r--packages/plugins/queue-restate/src/tests/docker-compose.yml8
-rw-r--r--packages/plugins/queue-restate/src/tests/queue.test.ts221
-rw-r--r--packages/plugins/queue-restate/src/tests/setup/startContainers.ts90
-rw-r--r--packages/plugins/queue-restate/src/tests/utils.ts23
-rw-r--r--packages/plugins/search-meilisearch/index.ts12
-rw-r--r--packages/plugins/search-meilisearch/src/env.ts8
-rw-r--r--packages/plugins/search-meilisearch/src/index.ts159
-rw-r--r--packages/plugins/tsconfig.json9
-rw-r--r--packages/plugins/vitest.config.ts14
20 files changed, 1308 insertions, 0 deletions
diff --git a/packages/plugins/.oxlintrc.json b/packages/plugins/.oxlintrc.json
new file mode 100644
index 00000000..79ba0255
--- /dev/null
+++ b/packages/plugins/.oxlintrc.json
@@ -0,0 +1,19 @@
+{
+ "$schema": "../../node_modules/oxlint/configuration_schema.json",
+ "extends": [
+ "../../tooling/oxlint/oxlint-base.json"
+ ],
+ "env": {
+ "builtin": true,
+ "commonjs": true
+ },
+ "ignorePatterns": [
+ "**/*.config.js",
+ "**/*.config.cjs",
+ "**/.eslintrc.cjs",
+ "**/.next",
+ "**/dist",
+ "**/build",
+ "**/pnpm-lock.yaml"
+ ]
+}
diff --git a/packages/plugins/package.json b/packages/plugins/package.json
new file mode 100644
index 00000000..8b3f73f7
--- /dev/null
+++ b/packages/plugins/package.json
@@ -0,0 +1,34 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@karakeep/plugins",
+ "version": "0.1.0",
+ "private": true,
+ "type": "module",
+ "exports": {
+ "./queue-liteque": "./queue-liteque/index.ts",
+ "./queue-restate": "./queue-restate/index.ts",
+ "./search-meilisearch": "./search-meilisearch/index.ts"
+ },
+ "scripts": {
+ "typecheck": "tsc --noEmit",
+ "format": "prettier . --cache --ignore-path ../../.prettierignore --check",
+ "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write",
+ "lint": "oxlint .",
+ "lint:fix": "oxlint . --fix",
+ "test": "vitest"
+ },
+ "dependencies": {
+ "@karakeep/shared": "workspace:*",
+ "@restatedev/restate-sdk": "^1.9.0",
+ "@restatedev/restate-sdk-clients": "^1.9.0",
+ "liteque": "^0.6.2",
+ "meilisearch": "^0.45.0"
+ },
+ "devDependencies": {
+ "@karakeep/prettier-config": "workspace:^0.1.0",
+ "@karakeep/tsconfig": "workspace:^0.1.0",
+ "vite-tsconfig-paths": "^4.3.1",
+ "vitest": "^3.2.4"
+ },
+ "prettier": "@karakeep/prettier-config"
+}
diff --git a/packages/plugins/queue-liteque/index.ts b/packages/plugins/queue-liteque/index.ts
new file mode 100644
index 00000000..c3f7f03b
--- /dev/null
+++ b/packages/plugins/queue-liteque/index.ts
@@ -0,0 +1,10 @@
+// Auto-register the Liteque queue provider when this package is imported
+import { PluginManager, PluginType } from "@karakeep/shared/plugins";
+
+import { LitequeQueueProvider } from "./src";
+
+PluginManager.register({
+ type: PluginType.Queue,
+ name: "Liteque",
+ provider: new LitequeQueueProvider(),
+});
diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts
new file mode 100644
index 00000000..ddc2181c
--- /dev/null
+++ b/packages/plugins/queue-liteque/src/index.ts
@@ -0,0 +1,137 @@
+import path from "node:path";
+import {
+ buildDBClient,
+ SqliteQueue as LQ,
+ Runner as LQRunner,
+ migrateDB,
+} from "liteque";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ DequeuedJob,
+ DequeuedJobError,
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import serverConfig from "@karakeep/shared/config";
+
+class LitequeQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly lq: LQ<T>,
+ public readonly opts: QueueOptions,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ const job = await this.lq.enqueue(payload, options);
+ // liteque returns a Job with numeric id
+ return job ? String(job.id) : undefined;
+ }
+
+ async stats() {
+ return this.lq.stats();
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ return this.lq.cancelAllNonRunning();
+ }
+
+ // Internal accessor for runner
+ get _impl(): LQ<T> {
+ return this.lq;
+ }
+}
+
+class LitequeQueueClient implements QueueClient {
+ private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), {
+ walEnabled: serverConfig.database.walMode,
+ });
+
+ private queues = new Map<string, LitequeQueueWrapper<unknown>>();
+
+ async prepare(): Promise<void> {
+ migrateDB(this.db);
+ }
+
+ async start(): Promise<void> {
+ // No-op for sqlite
+ }
+
+ createQueue<T>(name: string, options: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const lq = new LQ<T>(name, this.db, {
+ defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries },
+ keepFailedJobs: options.keepFailedJobs,
+ });
+ const wrapper = new LitequeQueueWrapper<T>(name, lq, options);
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.queues.get(name);
+ if (!wrapper) {
+ throw new Error(`Queue ${name} not found`);
+ }
+
+ const runner = new LQRunner<T>(
+ wrapper._impl,
+ {
+ run: funcs.run,
+ onComplete: funcs.onComplete as
+ | ((job: DequeuedJob<T>) => Promise<void>)
+ | undefined,
+ onError: funcs.onError as
+ | ((job: DequeuedJobError<T>) => Promise<void>)
+ | undefined,
+ },
+ {
+ pollIntervalMs: opts.pollIntervalMs ?? 1000,
+ timeoutSecs: opts.timeoutSecs,
+ concurrency: opts.concurrency,
+ validator: opts.validator,
+ },
+ );
+
+ return {
+ run: () => runner.run(),
+ stop: () => runner.stop(),
+ runUntilEmpty: () => runner.runUntilEmpty(),
+ };
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class LitequeQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new LitequeQueueClient();
+ this.client = client;
+ }
+ return this.client;
+ }
+}
diff --git a/packages/plugins/queue-restate/index.ts b/packages/plugins/queue-restate/index.ts
new file mode 100644
index 00000000..d313615c
--- /dev/null
+++ b/packages/plugins/queue-restate/index.ts
@@ -0,0 +1,12 @@
+// Auto-register the Restate queue provider when this package is imported
+import { PluginManager, PluginType } from "@karakeep/shared/plugins";
+
+import { RestateQueueProvider } from "./src";
+
+if (RestateQueueProvider.isConfigured()) {
+ PluginManager.register({
+ type: PluginType.Queue,
+ name: "Restate",
+ provider: new RestateQueueProvider(),
+ });
+}
diff --git a/packages/plugins/queue-restate/src/admin.ts b/packages/plugins/queue-restate/src/admin.ts
new file mode 100644
index 00000000..dddc8f00
--- /dev/null
+++ b/packages/plugins/queue-restate/src/admin.ts
@@ -0,0 +1,75 @@
+import { z } from "zod";
+
+export class AdminClient {
+ constructor(private addr: string) {}
+
+ async upsertDeployment(deploymentAddr: string) {
+ const res = await fetch(`${this.addr}/deployments`, {
+ method: "POST",
+ body: JSON.stringify({
+ uri: deploymentAddr,
+ force: true,
+ }),
+ headers: {
+ "Content-Type": "application/json",
+ },
+ });
+
+ if (!res.ok) {
+ throw new Error(`Failed to upsert deployment: ${res.status}`);
+ }
+ }
+
+ async getStats(serviceName: string) {
+ const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`;
+ const res = await fetch(`${this.addr}/query`, {
+ method: "POST",
+ body: JSON.stringify({
+ query,
+ }),
+ headers: {
+ "Content-Type": "application/json",
+ Accept: "application/json",
+ },
+ });
+
+ if (!res.ok) {
+ throw new Error(`Failed to get stats: ${res.status}`);
+ }
+ const zStatus = z.enum([
+ "pending",
+ "scheduled",
+ "ready",
+ "running",
+ "paused",
+ "backing-off",
+ "suspended",
+ "completed",
+ ]);
+ const zSchema = z.object({
+ rows: z.array(
+ z.object({
+ status: zStatus,
+ count: z.number(),
+ }),
+ ),
+ });
+
+ return zSchema.parse(await res.json()).rows.reduce(
+ (acc, cur) => {
+ acc[cur.status] = cur.count;
+ return acc;
+ },
+ {
+ pending: 0,
+ scheduled: 0,
+ ready: 0,
+ running: 0,
+ paused: 0,
+ "backing-off": 0,
+ suspended: 0,
+ completed: 0,
+ } as Record<z.infer<typeof zStatus>, number>,
+ );
+ }
+}
diff --git a/packages/plugins/queue-restate/src/env.ts b/packages/plugins/queue-restate/src/env.ts
new file mode 100644
index 00000000..01175e86
--- /dev/null
+++ b/packages/plugins/queue-restate/src/env.ts
@@ -0,0 +1,13 @@
+import { z } from "zod";
+
+export const envConfig = z
+ .object({
+ RESTATE_LISTEN_PORT: z.coerce.number().optional(),
+ RESTATE_INGRESS_ADDR: z
+ .string()
+ .optional()
+ .default("http://localhost:8080"),
+ RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"),
+ RESTATE_PUB_KEY: z.string().optional(),
+ })
+ .parse(process.env);
diff --git a/packages/plugins/queue-restate/src/idProvider.ts b/packages/plugins/queue-restate/src/idProvider.ts
new file mode 100644
index 00000000..ee85f46f
--- /dev/null
+++ b/packages/plugins/queue-restate/src/idProvider.ts
@@ -0,0 +1,21 @@
+import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
+
+export const idProvider = object({
+ name: "IdProvider",
+ handlers: {
+ get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => {
+ const state = (await ctx.get("nextId")) ?? 0;
+ ctx.set("nextId", state + 1);
+ return state;
+ },
+ },
+ options: {
+ ingressPrivate: true,
+ },
+});
+
+export async function genId(ctx: Context) {
+ return ctx
+ .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider")
+ .get();
+}
diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts
new file mode 100644
index 00000000..bedc26af
--- /dev/null
+++ b/packages/plugins/queue-restate/src/index.ts
@@ -0,0 +1,201 @@
+import * as restate from "@restatedev/restate-sdk";
+import * as restateClient from "@restatedev/restate-sdk-clients";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import logger from "@karakeep/shared/logger";
+
+import { AdminClient } from "./admin";
+import { envConfig } from "./env";
+import { idProvider } from "./idProvider";
+import { semaphore } from "./semaphore";
+import { buildRestateService } from "./service";
+
+class RestateQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly client: restateClient.Ingress,
+ private readonly adminClient: AdminClient,
+ public readonly opts: QueueOptions,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ interface MyService {
+ run: (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => Promise<void>;
+ }
+ const cl = this.client.serviceSendClient<MyService>({ name: this.name() });
+ const res = await cl.run(
+ {
+ payload,
+ priority: options?.priority ?? 0,
+ },
+ restateClient.rpc.sendOpts({
+ delay: options?.delayMs
+ ? {
+ milliseconds: options.delayMs,
+ }
+ : undefined,
+ idempotencyKey: options?.idempotencyKey,
+ }),
+ );
+ return res.invocationId;
+ }
+
+ async stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }> {
+ const res = await this.adminClient.getStats(this.name());
+ return {
+ pending: res.pending + res.ready,
+ pending_retry: res["backing-off"] + res.paused + res.suspended,
+ running: res.running,
+ failed: 0,
+ };
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ throw new Error("Method not implemented.");
+ }
+}
+
+class RestateRunnerWrapper<T> implements Runner<T> {
+ constructor(
+ private readonly wf: restate.ServiceDefinition<
+ string,
+ {
+ run: (ctx: restate.Context, data: T) => Promise<void>;
+ }
+ >,
+ ) {}
+
+ async run(): Promise<void> {
+ // No-op for restate
+ }
+
+ async stop(): Promise<void> {
+ // No-op for restate
+ }
+
+ async runUntilEmpty(): Promise<void> {
+ throw new Error("Method not implemented.");
+ }
+
+ get def(): restate.WorkflowDefinition<string, unknown> {
+ return this.wf;
+ }
+}
+
+class RestateQueueClient implements QueueClient {
+ private client: restateClient.Ingress;
+ private adminClient: AdminClient;
+ private queues = new Map<string, RestateQueueWrapper<unknown>>();
+ private services = new Map<string, RestateRunnerWrapper<unknown>>();
+
+ constructor() {
+ this.client = restateClient.connect({
+ url: envConfig.RESTATE_INGRESS_ADDR,
+ });
+ this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR);
+ }
+
+ async prepare(): Promise<void> {
+ // No-op for restate
+ }
+
+ async start(): Promise<void> {
+ const port = await restate.serve({
+ port: envConfig.RESTATE_LISTEN_PORT ?? 0,
+ services: [
+ ...[...this.services.values()].map((svc) => svc.def),
+ semaphore,
+ idProvider,
+ ],
+ identityKeys: envConfig.RESTATE_PUB_KEY
+ ? [envConfig.RESTATE_PUB_KEY]
+ : undefined,
+ logger: (meta, msg) => {
+ if (meta.context) {
+ // No need to log invocation logs
+ } else {
+ logger.log(meta.level, `[restate] ${msg}`);
+ }
+ },
+ });
+ logger.info(`Restate listening on port ${port}`);
+ }
+
+ createQueue<T>(name: string, opts: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const wrapper = new RestateQueueWrapper<T>(
+ name,
+ this.client,
+ this.adminClient,
+ opts,
+ );
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.services.get(name);
+ if (wrapper) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const svc = new RestateRunnerWrapper<T>(
+ buildRestateService(queue, funcs, opts, queue.opts),
+ );
+ this.services.set(name, svc);
+ return svc;
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class RestateQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ static isConfigured(): boolean {
+ return envConfig.RESTATE_LISTEN_PORT !== undefined;
+ }
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new RestateQueueClient();
+ this.client = client;
+ }
+ return this.client;
+ }
+}
diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts
new file mode 100644
index 00000000..ad636f98
--- /dev/null
+++ b/packages/plugins/queue-restate/src/semaphore.ts
@@ -0,0 +1,109 @@
+// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts
+
+import { Context, object, ObjectContext } from "@restatedev/restate-sdk";
+
+interface QueueItem {
+ awakeable: string;
+ priority: number;
+}
+
+interface QueueState {
+ items: QueueItem[];
+ inFlight: number;
+}
+
+export const semaphore = object({
+ name: "Semaphore",
+ handlers: {
+ acquire: async (
+ ctx: ObjectContext<QueueState>,
+ req: { awakeableId: string; priority: number; capacity: number },
+ ): Promise<void> => {
+ const state = await getState(ctx);
+
+ state.items.push({
+ awakeable: req.awakeableId,
+ priority: req.priority,
+ });
+
+ tick(ctx, state, req.capacity);
+
+ setState(ctx, state);
+ },
+
+ release: async (
+ ctx: ObjectContext<QueueState>,
+ capacity: number,
+ ): Promise<void> => {
+ const state = await getState(ctx);
+ state.inFlight--;
+ tick(ctx, state, capacity);
+ setState(ctx, state);
+ },
+ },
+ options: {
+ ingressPrivate: true,
+ },
+});
+
+// Lower numbers represent higher priority, mirroring Liteque’s semantics.
+function selectAndPopItem(items: QueueItem[]): QueueItem {
+ let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 };
+ for (const [i, item] of items.entries()) {
+ if (item.priority < selected.priority) {
+ selected.priority = item.priority;
+ selected.index = i;
+ }
+ }
+ const [item] = items.splice(selected.index, 1);
+ return item;
+}
+
+function tick(
+ ctx: ObjectContext<QueueState>,
+ state: QueueState,
+ capacity: number,
+) {
+ while (state.inFlight < capacity && state.items.length > 0) {
+ const item = selectAndPopItem(state.items);
+ state.inFlight++;
+ ctx.resolveAwakeable(item.awakeable);
+ }
+}
+
+async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> {
+ return {
+ items: (await ctx.get("items")) ?? [],
+ inFlight: (await ctx.get("inFlight")) ?? 0,
+ };
+}
+
+function setState(ctx: ObjectContext<QueueState>, state: QueueState) {
+ ctx.set("items", state.items);
+ ctx.set("inFlight", state.inFlight);
+}
+
+export class RestateSemaphore {
+ constructor(
+ private readonly ctx: Context,
+ private readonly id: string,
+ private readonly capacity: number,
+ ) {}
+
+ async acquire(priority: number) {
+ const awk = this.ctx.awakeable();
+ await this.ctx
+ .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
+ .acquire({
+ awakeableId: awk.id,
+ priority,
+ capacity: this.capacity,
+ });
+ await awk.promise;
+ }
+ async release() {
+ await this.ctx
+ .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id)
+ .release(this.capacity);
+ }
+}
diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts
new file mode 100644
index 00000000..de5b070f
--- /dev/null
+++ b/packages/plugins/queue-restate/src/service.ts
@@ -0,0 +1,133 @@
+import * as restate from "@restatedev/restate-sdk";
+
+import type {
+ Queue,
+ QueueOptions,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import { genId } from "./idProvider";
+import { RestateSemaphore } from "./semaphore";
+
+export function buildRestateService<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ queueOpts: QueueOptions,
+) {
+ const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries;
+ return restate.service({
+ name: queue.name(),
+ options: {
+ inactivityTimeout: {
+ seconds: opts.timeoutSecs,
+ },
+ },
+ handlers: {
+ run: async (
+ ctx: restate.Context,
+ data: {
+ payload: T;
+ priority: number;
+ },
+ ) => {
+ const id = `${await genId(ctx)}`;
+ let payload = data.payload;
+ if (opts.validator) {
+ const res = opts.validator.safeParse(data.payload);
+ if (!res.success) {
+ throw new restate.TerminalError(res.error.message, {
+ errorCode: 400,
+ });
+ }
+ payload = res.data;
+ }
+
+ const priority = data.priority ?? 0;
+
+ const semaphore = new RestateSemaphore(
+ ctx,
+ `queue:${queue.name()}`,
+ opts.concurrency,
+ );
+
+ let lastError: Error | undefined;
+ for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) {
+ await semaphore.acquire(priority);
+ const res = await runWorkerLogic(ctx, funcs, {
+ id,
+ data: payload,
+ priority,
+ runNumber,
+ numRetriesLeft: NUM_RETRIES - runNumber,
+ abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000),
+ });
+ await semaphore.release();
+ if (res.error) {
+ lastError = res.error;
+ // TODO: add backoff
+ await ctx.sleep(1000);
+ } else {
+ break;
+ }
+ }
+ if (lastError) {
+ throw new restate.TerminalError(lastError.message, {
+ errorCode: 500,
+ cause: "cause" in lastError ? lastError.cause : undefined,
+ });
+ }
+ },
+ },
+ });
+}
+
+async function runWorkerLogic<T>(
+ ctx: restate.Context,
+ { run, onError, onComplete }: RunnerFuncs<T>,
+ data: {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ numRetriesLeft: number;
+ abortSignal: AbortSignal;
+ },
+) {
+ const res = await tryCatch(
+ ctx.run(
+ `main logic`,
+ async () => {
+ await run(data);
+ },
+ {
+ maxRetryAttempts: 1,
+ },
+ ),
+ );
+ if (res.error) {
+ await tryCatch(
+ ctx.run(
+ `onError`,
+ async () =>
+ onError?.({
+ ...data,
+ error: res.error,
+ }),
+ {
+ maxRetryAttempts: 1,
+ },
+ ),
+ );
+ return res;
+ }
+
+ await tryCatch(
+ ctx.run("onComplete", async () => await onComplete?.(data), {
+ maxRetryAttempts: 1,
+ }),
+ );
+ return res;
+}
diff --git a/packages/plugins/queue-restate/src/tests/docker-compose.yml b/packages/plugins/queue-restate/src/tests/docker-compose.yml
new file mode 100644
index 00000000..f24c2921
--- /dev/null
+++ b/packages/plugins/queue-restate/src/tests/docker-compose.yml
@@ -0,0 +1,8 @@
+services:
+ restate:
+ image: docker.restate.dev/restatedev/restate:1.5
+ ports:
+ - "${RESTATE_INGRESS_PORT:-8080}:8080"
+ - "${RESTATE_ADMIN_PORT:-9070}:9070"
+ extra_hosts:
+ - "host.docker.internal:host-gateway"
diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts
new file mode 100644
index 00000000..e59d47cb
--- /dev/null
+++ b/packages/plugins/queue-restate/src/tests/queue.test.ts
@@ -0,0 +1,221 @@
+import {
+ afterAll,
+ afterEach,
+ beforeAll,
+ beforeEach,
+ describe,
+ expect,
+ inject,
+ it,
+} from "vitest";
+
+import type { Queue, QueueClient } from "@karakeep/shared/queueing";
+
+import { AdminClient } from "../admin.js";
+import { RestateQueueProvider } from "../index.js";
+import { waitUntil } from "./utils.js";
+
+type TestAction =
+ | { type: "val"; val: number }
+ | { type: "err"; err: string }
+ | { type: "stall"; durSec: number };
+
+describe("Restate Queue Provider", () => {
+ let queueClient: QueueClient;
+ let queue: Queue<TestAction>;
+ let adminClient: AdminClient;
+
+ const testState = {
+ results: [] as number[],
+ errors: [] as string[],
+ inFlight: 0,
+ maxInFlight: 0,
+ };
+
+ async function waitUntilQueueEmpty() {
+ await waitUntil(
+ async () => {
+ const stats = await queue.stats();
+ return stats.pending + stats.pending_retry + stats.running === 0;
+ },
+ "Queue to be empty",
+ 60000,
+ );
+ }
+
+ beforeEach(async () => {
+ testState.results = [];
+ testState.errors = [];
+ testState.inFlight = 0;
+ testState.maxInFlight = 0;
+ });
+ afterEach(async () => {
+ await waitUntilQueueEmpty();
+ });
+
+ beforeAll(async () => {
+ const ingressPort = inject("restateIngressPort");
+ const adminPort = inject("restateAdminPort");
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ const provider = new RestateQueueProvider();
+ const client = await provider.getClient();
+
+ if (!client) {
+ throw new Error("Failed to create queue client");
+ }
+
+ queueClient = client;
+ adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR);
+
+ queue = queueClient.createQueue<TestAction>("test-queue", {
+ defaultJobArgs: {
+ numRetries: 3,
+ },
+ keepFailedJobs: false,
+ });
+
+ queueClient.createRunner(
+ queue,
+ {
+ run: async (job) => {
+ testState.inFlight++;
+ testState.maxInFlight = Math.max(
+ testState.maxInFlight,
+ testState.inFlight,
+ );
+ const jobData = job.data;
+ switch (jobData.type) {
+ case "val":
+ testState.results.push(jobData.val);
+ break;
+ case "err":
+ throw new Error(jobData.err);
+ case "stall":
+ await new Promise((resolve) =>
+ setTimeout(resolve, jobData.durSec * 1000),
+ );
+ break;
+ }
+ },
+ onError: async (job) => {
+ testState.inFlight--;
+ const jobData = job.data;
+ if (jobData && jobData.type === "err") {
+ testState.errors.push(jobData.err);
+ }
+ },
+ onComplete: async () => {
+ testState.inFlight--;
+ },
+ },
+ {
+ concurrency: 3,
+ timeoutSecs: 2,
+ pollIntervalMs: 0 /* Doesn't matter */,
+ },
+ );
+
+ await queueClient.prepare();
+ await queueClient.start();
+
+ await adminClient.upsertDeployment("http://host.docker.internal:9080");
+ }, 90000);
+
+ afterAll(async () => {
+ if (queueClient?.shutdown) {
+ await queueClient.shutdown();
+ }
+ });
+
+ it("should enqueue and process a job", async () => {
+ const jobId = await queue.enqueue({ type: "val", val: 42 });
+
+ expect(jobId).toBeDefined();
+ expect(typeof jobId).toBe("string");
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([42]);
+ }, 60000);
+
+ it("should process multiple jobs", async () => {
+ await queue.enqueue({ type: "val", val: 1 });
+ await queue.enqueue({ type: "val", val: 2 });
+ await queue.enqueue({ type: "val", val: 3 });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results.length).toEqual(3);
+ expect(testState.results).toContain(1);
+ expect(testState.results).toContain(2);
+ expect(testState.results).toContain(3);
+ }, 60000);
+
+ it("should retry failed jobs", async () => {
+ await queue.enqueue({ type: "err", err: "Test error" });
+
+ await waitUntilQueueEmpty();
+
+ // Initial attempt + 3 retries
+ expect(testState.errors).toEqual([
+ "Test error",
+ "Test error",
+ "Test error",
+ "Test error",
+ ]);
+ }, 90000);
+
+ it("should use idempotency key", async () => {
+ const idempotencyKey = `test-${Date.now()}`;
+
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+ await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey });
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([200]);
+ }, 60000);
+
+ it("should handle concurrent jobs", async () => {
+ const promises = [];
+ for (let i = 300; i < 320; i++) {
+ promises.push(queue.enqueue({ type: "stall", durSec: 0.1 }));
+ }
+ await Promise.all(promises);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.maxInFlight).toEqual(3);
+ }, 60000);
+
+ it("should handle priorities", async () => {
+ // Hog the queue first
+ await Promise.all([
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }),
+ queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }),
+ ]);
+
+ // Then those will get reprioritized
+ await Promise.all([
+ queue.enqueue({ type: "val", val: 200 }, { priority: -1 }),
+ queue.enqueue({ type: "val", val: 201 }, { priority: -2 }),
+ queue.enqueue({ type: "val", val: 202 }, { priority: -3 }),
+
+ queue.enqueue({ type: "val", val: 300 }, { priority: 0 }),
+ queue.enqueue({ type: "val", val: 301 }, { priority: 1 }),
+ queue.enqueue({ type: "val", val: 302 }, { priority: 2 }),
+ ]);
+
+ await waitUntilQueueEmpty();
+
+ expect(testState.results).toEqual([
+ // Lower numeric priority value should run first
+ 202, 201, 200, 300, 301, 302,
+ ]);
+ }, 60000);
+});
diff --git a/packages/plugins/queue-restate/src/tests/setup/startContainers.ts b/packages/plugins/queue-restate/src/tests/setup/startContainers.ts
new file mode 100644
index 00000000..7d9dea5c
--- /dev/null
+++ b/packages/plugins/queue-restate/src/tests/setup/startContainers.ts
@@ -0,0 +1,90 @@
+import { execSync } from "child_process";
+import net from "net";
+import path from "path";
+import { fileURLToPath } from "url";
+import type { GlobalSetupContext } from "vitest/node";
+
+import { waitUntil } from "../utils.js";
+
+async function getRandomPort(): Promise<number> {
+ const server = net.createServer();
+ return new Promise<number>((resolve, reject) => {
+ server.unref();
+ server.on("error", reject);
+ server.listen(0, () => {
+ const port = (server.address() as net.AddressInfo).port;
+ server.close(() => resolve(port));
+ });
+ });
+}
+
+async function waitForHealthy(
+ ingressPort: number,
+ adminPort: number,
+ timeout = 60000,
+): Promise<void> {
+ await waitUntil(
+ async () => {
+ const response = await fetch(`http://localhost:${adminPort}/health`);
+ return response.ok;
+ },
+ "Restate admin API is healthy",
+ timeout,
+ );
+
+ await waitUntil(
+ async () => {
+ const response = await fetch(
+ `http://localhost:${ingressPort}/restate/health`,
+ );
+ return response.ok;
+ },
+ "Restate ingress is healthy",
+ timeout,
+ );
+}
+
+export default async function ({ provide }: GlobalSetupContext) {
+ const __dirname = path.dirname(fileURLToPath(import.meta.url));
+ const ingressPort = await getRandomPort();
+ const adminPort = await getRandomPort();
+
+ console.log(
+ `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`,
+ );
+ execSync(`docker compose up -d`, {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ env: {
+ ...process.env,
+ RESTATE_INGRESS_PORT: ingressPort.toString(),
+ RESTATE_ADMIN_PORT: adminPort.toString(),
+ },
+ });
+
+ console.log("Waiting for Restate to become healthy...");
+ await waitForHealthy(ingressPort, adminPort);
+
+ provide("restateIngressPort", ingressPort);
+ provide("restateAdminPort", adminPort);
+
+ process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`;
+ process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`;
+ process.env.RESTATE_LISTEN_PORT = "9080";
+
+ return async () => {
+ console.log("Stopping Restate...");
+ execSync("docker compose down", {
+ cwd: path.join(__dirname, ".."),
+ stdio: "ignore",
+ });
+ return Promise.resolve();
+ };
+}
+
+declare module "vitest" {
+ export interface ProvidedContext {
+ restateIngressPort: number;
+ restateAdminPort: number;
+ }
+}
diff --git a/packages/plugins/queue-restate/src/tests/utils.ts b/packages/plugins/queue-restate/src/tests/utils.ts
new file mode 100644
index 00000000..e02d2dee
--- /dev/null
+++ b/packages/plugins/queue-restate/src/tests/utils.ts
@@ -0,0 +1,23 @@
+export async function waitUntil(
+ f: () => Promise<boolean>,
+ description: string,
+ timeoutMs = 60000,
+): Promise<void> {
+ const startTime = Date.now();
+
+ while (Date.now() - startTime < timeoutMs) {
+ console.log(`Waiting for ${description}...`);
+ try {
+ const res = await f();
+ if (res) {
+ console.log(`${description}: success`);
+ return;
+ }
+ } catch (error) {
+ console.log(`${description}: error, retrying...: ${error}`);
+ }
+ await new Promise((resolve) => setTimeout(resolve, 1000));
+ }
+
+ throw new Error(`${description}: timeout after ${timeoutMs}ms`);
+}
diff --git a/packages/plugins/search-meilisearch/index.ts b/packages/plugins/search-meilisearch/index.ts
new file mode 100644
index 00000000..3496d52f
--- /dev/null
+++ b/packages/plugins/search-meilisearch/index.ts
@@ -0,0 +1,12 @@
+// Auto-register the MeiliSearch provider when this package is imported
+import { PluginManager, PluginType } from "@karakeep/shared/plugins";
+
+import { MeiliSearchProvider } from "./src";
+
+if (MeiliSearchProvider.isConfigured()) {
+ PluginManager.register({
+ type: PluginType.Search,
+ name: "MeiliSearch",
+ provider: new MeiliSearchProvider(),
+ });
+}
diff --git a/packages/plugins/search-meilisearch/src/env.ts b/packages/plugins/search-meilisearch/src/env.ts
new file mode 100644
index 00000000..c06fdd55
--- /dev/null
+++ b/packages/plugins/search-meilisearch/src/env.ts
@@ -0,0 +1,8 @@
+import { z } from "zod";
+
+export const envConfig = z
+ .object({
+ MEILI_ADDR: z.string().optional(),
+ MEILI_MASTER_KEY: z.string().default(""),
+ })
+ .parse(process.env);
diff --git a/packages/plugins/search-meilisearch/src/index.ts b/packages/plugins/search-meilisearch/src/index.ts
new file mode 100644
index 00000000..30da4a64
--- /dev/null
+++ b/packages/plugins/search-meilisearch/src/index.ts
@@ -0,0 +1,159 @@
+import type { Index } from "meilisearch";
+import { MeiliSearch } from "meilisearch";
+
+import type {
+ BookmarkSearchDocument,
+ FilterQuery,
+ SearchIndexClient,
+ SearchOptions,
+ SearchResponse,
+} from "@karakeep/shared/search";
+import serverConfig from "@karakeep/shared/config";
+import { PluginProvider } from "@karakeep/shared/plugins";
+
+import { envConfig } from "./env";
+
+function filterToMeiliSearchFilter(filter: FilterQuery): string {
+ switch (filter.type) {
+ case "eq":
+ return `${filter.field} = "${filter.value}"`;
+ case "in":
+ return `${filter.field} IN [${filter.values.join(",")}]`;
+ default: {
+ const exhaustiveCheck: never = filter;
+ throw new Error(`Unhandled color case: ${exhaustiveCheck}`);
+ }
+ }
+}
+
+class MeiliSearchIndexClient implements SearchIndexClient {
+ constructor(private index: Index<BookmarkSearchDocument>) {}
+
+ async addDocuments(documents: BookmarkSearchDocument[]): Promise<void> {
+ const task = await this.index.addDocuments(documents, {
+ primaryKey: "id",
+ });
+ await this.ensureTaskSuccess(task.taskUid);
+ }
+
+ async deleteDocuments(ids: string[]): Promise<void> {
+ const task = await this.index.deleteDocuments(ids);
+ await this.ensureTaskSuccess(task.taskUid);
+ }
+
+ async search(options: SearchOptions): Promise<SearchResponse> {
+ const result = await this.index.search(options.query, {
+ filter: options.filter?.map((f) => filterToMeiliSearchFilter(f)),
+ limit: options.limit,
+ offset: options.offset,
+ sort: options.sort?.map((s) => `${s.field}:${s.order}`),
+ attributesToRetrieve: ["id"],
+ showRankingScore: true,
+ });
+
+ return {
+ hits: result.hits.map((hit) => ({
+ id: hit.id,
+ score: hit._rankingScore,
+ })),
+ totalHits: result.estimatedTotalHits ?? 0,
+ processingTimeMs: result.processingTimeMs,
+ };
+ }
+
+ async clearIndex(): Promise<void> {
+ const task = await this.index.deleteAllDocuments();
+ await this.ensureTaskSuccess(task.taskUid);
+ }
+
+ private async ensureTaskSuccess(taskUid: number): Promise<void> {
+ const task = await this.index.waitForTask(taskUid, {
+ intervalMs: 200,
+ timeOutMs: serverConfig.search.jobTimeoutSec * 1000 * 0.9,
+ });
+ if (task.error) {
+ throw new Error(`Search task failed: ${task.error.message}`);
+ }
+ }
+}
+
+export class MeiliSearchProvider implements PluginProvider<SearchIndexClient> {
+ private client: MeiliSearch | undefined;
+ private indexClient: SearchIndexClient | undefined;
+ private readonly indexName = "bookmarks";
+
+ constructor() {
+ if (MeiliSearchProvider.isConfigured()) {
+ this.client = new MeiliSearch({
+ host: envConfig.MEILI_ADDR!,
+ apiKey: envConfig.MEILI_MASTER_KEY,
+ });
+ }
+ }
+
+ static isConfigured(): boolean {
+ return !!envConfig.MEILI_ADDR;
+ }
+
+ async getClient(): Promise<SearchIndexClient | null> {
+ if (this.indexClient) {
+ return this.indexClient;
+ }
+
+ if (!this.client) {
+ return null;
+ }
+
+ const indices = await this.client.getIndexes();
+ let indexFound = indices.results.find((i) => i.uid === this.indexName);
+
+ if (!indexFound) {
+ const idx = await this.client.createIndex(this.indexName, {
+ primaryKey: "id",
+ });
+ await this.client.waitForTask(idx.taskUid);
+ indexFound = await this.client.getIndex<BookmarkSearchDocument>(
+ this.indexName,
+ );
+ }
+
+ await this.configureIndex(indexFound);
+ this.indexClient = new MeiliSearchIndexClient(indexFound);
+ return this.indexClient;
+ }
+
+ private async configureIndex(
+ index: Index<BookmarkSearchDocument>,
+ ): Promise<void> {
+ const desiredFilterableAttributes = ["id", "userId"].sort();
+ const desiredSortableAttributes = ["createdAt"].sort();
+
+ const settings = await index.getSettings();
+
+ if (
+ JSON.stringify(settings.filterableAttributes?.sort()) !==
+ JSON.stringify(desiredFilterableAttributes)
+ ) {
+ console.log(
+ `[meilisearch] Updating desired filterable attributes to ${desiredFilterableAttributes} from ${settings.filterableAttributes}`,
+ );
+ const taskId = await index.updateFilterableAttributes(
+ desiredFilterableAttributes,
+ );
+ await this.client!.waitForTask(taskId.taskUid);
+ }
+
+ if (
+ JSON.stringify(settings.sortableAttributes?.sort()) !==
+ JSON.stringify(desiredSortableAttributes)
+ ) {
+ console.log(
+ `[meilisearch] Updating desired sortable attributes to ${desiredSortableAttributes} from ${settings.sortableAttributes}`,
+ );
+ const taskId = await index.updateSortableAttributes(
+ desiredSortableAttributes,
+ );
+ await this.client!.waitForTask(taskId.taskUid);
+ }
+ }
+}
diff --git a/packages/plugins/tsconfig.json b/packages/plugins/tsconfig.json
new file mode 100644
index 00000000..a795b96a
--- /dev/null
+++ b/packages/plugins/tsconfig.json
@@ -0,0 +1,9 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@karakeep/tsconfig/node.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"],
+ "compilerOptions": {
+ "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
+ }
+}
diff --git a/packages/plugins/vitest.config.ts b/packages/plugins/vitest.config.ts
new file mode 100644
index 00000000..3d5f33f7
--- /dev/null
+++ b/packages/plugins/vitest.config.ts
@@ -0,0 +1,14 @@
+/// <reference types="vitest" />
+
+import tsconfigPaths from "vite-tsconfig-paths";
+import { defineConfig } from "vitest/config";
+
+export default defineConfig({
+ plugins: [tsconfigPaths()],
+ test: {
+ globalSetup: ["./queue-restate/src/tests/setup/startContainers.ts"],
+ teardownTimeout: 30000,
+ include: ["**/src/tests/**/*.test.ts"],
+ testTimeout: 60000,
+ },
+});