From 99413db0e79a156a1b87eacd3c6a7b83e9df946e Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 8 Nov 2025 14:50:00 +0000 Subject: refactor: consolidate multiple karakeep plugins into one package (#2101) * refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude --- packages/plugins-queue-liteque/.oxlintrc.json | 19 -- packages/plugins-queue-liteque/index.ts | 10 - packages/plugins-queue-liteque/package.json | 26 --- packages/plugins-queue-liteque/src/index.ts | 137 ------------- packages/plugins-queue-liteque/tsconfig.json | 10 - packages/plugins-queue-restate/.oxlintrc.json | 19 -- packages/plugins-queue-restate/index.ts | 12 -- packages/plugins-queue-restate/package.json | 27 --- packages/plugins-queue-restate/src/admin.ts | 75 ------- packages/plugins-queue-restate/src/env.ts | 13 -- packages/plugins-queue-restate/src/idProvider.ts | 21 -- packages/plugins-queue-restate/src/index.ts | 201 ------------------- packages/plugins-queue-restate/src/semaphore.ts | 109 ---------- packages/plugins-queue-restate/src/service.ts | 133 ------------- .../src/tests/docker-compose.yml | 8 - .../plugins-queue-restate/src/tests/queue.test.ts | 221 --------------------- .../src/tests/setup/startContainers.ts | 90 --------- packages/plugins-queue-restate/src/tests/utils.ts | 23 --- packages/plugins-queue-restate/tsconfig.json | 10 - packages/plugins-queue-restate/vitest.config.ts | 14 -- packages/plugins-search-meilisearch/.oxlintrc.json | 19 -- packages/plugins-search-meilisearch/index.ts | 12 -- packages/plugins-search-meilisearch/package.json | 26 --- packages/plugins-search-meilisearch/src/env.ts | 8 - packages/plugins-search-meilisearch/src/index.ts | 159 --------------- packages/plugins-search-meilisearch/tsconfig.json | 9 - packages/plugins/.oxlintrc.json | 19 ++ packages/plugins/package.json | 34 ++++ packages/plugins/queue-liteque/index.ts | 10 + packages/plugins/queue-liteque/src/index.ts | 137 +++++++++++++ packages/plugins/queue-restate/index.ts | 12 ++ packages/plugins/queue-restate/src/admin.ts | 75 +++++++ packages/plugins/queue-restate/src/env.ts | 13 ++ packages/plugins/queue-restate/src/idProvider.ts | 21 ++ packages/plugins/queue-restate/src/index.ts | 201 +++++++++++++++++++ packages/plugins/queue-restate/src/semaphore.ts | 109 ++++++++++ packages/plugins/queue-restate/src/service.ts | 133 +++++++++++++ .../queue-restate/src/tests/docker-compose.yml | 8 + .../plugins/queue-restate/src/tests/queue.test.ts | 221 +++++++++++++++++++++ .../src/tests/setup/startContainers.ts | 90 +++++++++ packages/plugins/queue-restate/src/tests/utils.ts | 23 +++ packages/plugins/search-meilisearch/index.ts | 12 ++ packages/plugins/search-meilisearch/src/env.ts | 8 + packages/plugins/search-meilisearch/src/index.ts | 159 +++++++++++++++ packages/plugins/tsconfig.json | 9 + packages/plugins/vitest.config.ts | 14 ++ packages/shared-server/package.json | 4 +- packages/shared-server/src/plugins.ts | 6 +- pnpm-lock.yaml | 80 ++------ 49 files changed, 1330 insertions(+), 1479 deletions(-) delete mode 100644 packages/plugins-queue-liteque/.oxlintrc.json delete mode 100644 packages/plugins-queue-liteque/index.ts delete mode 100644 packages/plugins-queue-liteque/package.json delete mode 100644 packages/plugins-queue-liteque/src/index.ts delete mode 100644 packages/plugins-queue-liteque/tsconfig.json delete mode 100644 packages/plugins-queue-restate/.oxlintrc.json delete mode 100644 packages/plugins-queue-restate/index.ts delete mode 100644 packages/plugins-queue-restate/package.json delete mode 100644 packages/plugins-queue-restate/src/admin.ts delete mode 100644 packages/plugins-queue-restate/src/env.ts delete mode 100644 packages/plugins-queue-restate/src/idProvider.ts delete mode 100644 packages/plugins-queue-restate/src/index.ts delete mode 100644 packages/plugins-queue-restate/src/semaphore.ts delete mode 100644 packages/plugins-queue-restate/src/service.ts delete mode 100644 packages/plugins-queue-restate/src/tests/docker-compose.yml delete mode 100644 packages/plugins-queue-restate/src/tests/queue.test.ts delete mode 100644 packages/plugins-queue-restate/src/tests/setup/startContainers.ts delete mode 100644 packages/plugins-queue-restate/src/tests/utils.ts delete mode 100644 packages/plugins-queue-restate/tsconfig.json delete mode 100644 packages/plugins-queue-restate/vitest.config.ts delete mode 100644 packages/plugins-search-meilisearch/.oxlintrc.json delete mode 100644 packages/plugins-search-meilisearch/index.ts delete mode 100644 packages/plugins-search-meilisearch/package.json delete mode 100644 packages/plugins-search-meilisearch/src/env.ts delete mode 100644 packages/plugins-search-meilisearch/src/index.ts delete mode 100644 packages/plugins-search-meilisearch/tsconfig.json create mode 100644 packages/plugins/.oxlintrc.json create mode 100644 packages/plugins/package.json create mode 100644 packages/plugins/queue-liteque/index.ts create mode 100644 packages/plugins/queue-liteque/src/index.ts create mode 100644 packages/plugins/queue-restate/index.ts create mode 100644 packages/plugins/queue-restate/src/admin.ts create mode 100644 packages/plugins/queue-restate/src/env.ts create mode 100644 packages/plugins/queue-restate/src/idProvider.ts create mode 100644 packages/plugins/queue-restate/src/index.ts create mode 100644 packages/plugins/queue-restate/src/semaphore.ts create mode 100644 packages/plugins/queue-restate/src/service.ts create mode 100644 packages/plugins/queue-restate/src/tests/docker-compose.yml create mode 100644 packages/plugins/queue-restate/src/tests/queue.test.ts create mode 100644 packages/plugins/queue-restate/src/tests/setup/startContainers.ts create mode 100644 packages/plugins/queue-restate/src/tests/utils.ts create mode 100644 packages/plugins/search-meilisearch/index.ts create mode 100644 packages/plugins/search-meilisearch/src/env.ts create mode 100644 packages/plugins/search-meilisearch/src/index.ts create mode 100644 packages/plugins/tsconfig.json create mode 100644 packages/plugins/vitest.config.ts diff --git a/packages/plugins-queue-liteque/.oxlintrc.json b/packages/plugins-queue-liteque/.oxlintrc.json deleted file mode 100644 index 79ba0255..00000000 --- a/packages/plugins-queue-liteque/.oxlintrc.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "../../node_modules/oxlint/configuration_schema.json", - "extends": [ - "../../tooling/oxlint/oxlint-base.json" - ], - "env": { - "builtin": true, - "commonjs": true - }, - "ignorePatterns": [ - "**/*.config.js", - "**/*.config.cjs", - "**/.eslintrc.cjs", - "**/.next", - "**/dist", - "**/build", - "**/pnpm-lock.yaml" - ] -} diff --git a/packages/plugins-queue-liteque/index.ts b/packages/plugins-queue-liteque/index.ts deleted file mode 100644 index c3f7f03b..00000000 --- a/packages/plugins-queue-liteque/index.ts +++ /dev/null @@ -1,10 +0,0 @@ -// Auto-register the Liteque queue provider when this package is imported -import { PluginManager, PluginType } from "@karakeep/shared/plugins"; - -import { LitequeQueueProvider } from "./src"; - -PluginManager.register({ - type: PluginType.Queue, - name: "Liteque", - provider: new LitequeQueueProvider(), -}); diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json deleted file mode 100644 index bb4b1aac..00000000 --- a/packages/plugins-queue-liteque/package.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@karakeep/plugins-queue-liteque", - "version": "0.1.0", - "private": true, - "type": "module", - "scripts": { - "typecheck": "tsc --noEmit", - "format": "prettier . --cache --ignore-path ../../.prettierignore --check", - "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", - "lint": "oxlint .", - "lint:fix": "oxlint . --fix", - "test": "vitest" - }, - "dependencies": { - "@karakeep/shared": "workspace:*", - "liteque": "^0.6.2" - }, - "devDependencies": { - "@karakeep/prettier-config": "workspace:^0.1.0", - "@karakeep/tsconfig": "workspace:^0.1.0", - "vite-tsconfig-paths": "^4.3.1", - "vitest": "^3.2.4" - }, - "prettier": "@karakeep/prettier-config" -} diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts deleted file mode 100644 index ddc2181c..00000000 --- a/packages/plugins-queue-liteque/src/index.ts +++ /dev/null @@ -1,137 +0,0 @@ -import path from "node:path"; -import { - buildDBClient, - SqliteQueue as LQ, - Runner as LQRunner, - migrateDB, -} from "liteque"; - -import type { PluginProvider } from "@karakeep/shared/plugins"; -import type { - DequeuedJob, - DequeuedJobError, - EnqueueOptions, - Queue, - QueueClient, - QueueOptions, - Runner, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import serverConfig from "@karakeep/shared/config"; - -class LitequeQueueWrapper implements Queue { - constructor( - private readonly _name: string, - private readonly lq: LQ, - public readonly opts: QueueOptions, - ) {} - - name(): string { - return this._name; - } - - async enqueue( - payload: T, - options?: EnqueueOptions, - ): Promise { - const job = await this.lq.enqueue(payload, options); - // liteque returns a Job with numeric id - return job ? String(job.id) : undefined; - } - - async stats() { - return this.lq.stats(); - } - - async cancelAllNonRunning(): Promise { - return this.lq.cancelAllNonRunning(); - } - - // Internal accessor for runner - get _impl(): LQ { - return this.lq; - } -} - -class LitequeQueueClient implements QueueClient { - private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { - walEnabled: serverConfig.database.walMode, - }); - - private queues = new Map>(); - - async prepare(): Promise { - migrateDB(this.db); - } - - async start(): Promise { - // No-op for sqlite - } - - createQueue(name: string, options: QueueOptions): Queue { - if (this.queues.has(name)) { - throw new Error(`Queue ${name} already exists`); - } - const lq = new LQ(name, this.db, { - defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, - keepFailedJobs: options.keepFailedJobs, - }); - const wrapper = new LitequeQueueWrapper(name, lq, options); - this.queues.set(name, wrapper); - return wrapper; - } - - createRunner( - queue: Queue, - funcs: RunnerFuncs, - opts: RunnerOptions, - ): Runner { - const name = queue.name(); - let wrapper = this.queues.get(name); - if (!wrapper) { - throw new Error(`Queue ${name} not found`); - } - - const runner = new LQRunner( - wrapper._impl, - { - run: funcs.run, - onComplete: funcs.onComplete as - | ((job: DequeuedJob) => Promise) - | undefined, - onError: funcs.onError as - | ((job: DequeuedJobError) => Promise) - | undefined, - }, - { - pollIntervalMs: opts.pollIntervalMs ?? 1000, - timeoutSecs: opts.timeoutSecs, - concurrency: opts.concurrency, - validator: opts.validator, - }, - ); - - return { - run: () => runner.run(), - stop: () => runner.stop(), - runUntilEmpty: () => runner.runUntilEmpty(), - }; - } - - async shutdown(): Promise { - // No-op for sqlite - } -} - -export class LitequeQueueProvider implements PluginProvider { - private client: QueueClient | null = null; - - async getClient(): Promise { - if (!this.client) { - const client = new LitequeQueueClient(); - this.client = client; - } - return this.client; - } -} diff --git a/packages/plugins-queue-liteque/tsconfig.json b/packages/plugins-queue-liteque/tsconfig.json deleted file mode 100644 index 3bfa695c..00000000 --- a/packages/plugins-queue-liteque/tsconfig.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/tsconfig", - "extends": "@karakeep/tsconfig/node.json", - "include": ["**/*.ts"], - "exclude": ["node_modules"], - "compilerOptions": { - "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" - } -} - diff --git a/packages/plugins-queue-restate/.oxlintrc.json b/packages/plugins-queue-restate/.oxlintrc.json deleted file mode 100644 index 79ba0255..00000000 --- a/packages/plugins-queue-restate/.oxlintrc.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "../../node_modules/oxlint/configuration_schema.json", - "extends": [ - "../../tooling/oxlint/oxlint-base.json" - ], - "env": { - "builtin": true, - "commonjs": true - }, - "ignorePatterns": [ - "**/*.config.js", - "**/*.config.cjs", - "**/.eslintrc.cjs", - "**/.next", - "**/dist", - "**/build", - "**/pnpm-lock.yaml" - ] -} diff --git a/packages/plugins-queue-restate/index.ts b/packages/plugins-queue-restate/index.ts deleted file mode 100644 index d313615c..00000000 --- a/packages/plugins-queue-restate/index.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Auto-register the Restate queue provider when this package is imported -import { PluginManager, PluginType } from "@karakeep/shared/plugins"; - -import { RestateQueueProvider } from "./src"; - -if (RestateQueueProvider.isConfigured()) { - PluginManager.register({ - type: PluginType.Queue, - name: "Restate", - provider: new RestateQueueProvider(), - }); -} diff --git a/packages/plugins-queue-restate/package.json b/packages/plugins-queue-restate/package.json deleted file mode 100644 index 16681150..00000000 --- a/packages/plugins-queue-restate/package.json +++ /dev/null @@ -1,27 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@karakeep/plugins-queue-restate", - "version": "0.1.0", - "private": true, - "type": "module", - "scripts": { - "typecheck": "tsc --noEmit", - "format": "prettier . --cache --ignore-path ../../.prettierignore --check", - "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", - "lint": "oxlint .", - "lint:fix": "oxlint . --fix", - "test": "vitest" - }, - "dependencies": { - "@karakeep/shared": "workspace:*", - "@restatedev/restate-sdk": "^1.9.0", - "@restatedev/restate-sdk-clients": "^1.9.0" - }, - "devDependencies": { - "@karakeep/prettier-config": "workspace:^0.1.0", - "@karakeep/tsconfig": "workspace:^0.1.0", - "vite-tsconfig-paths": "^4.3.1", - "vitest": "^3.2.4" - }, - "prettier": "@karakeep/prettier-config" -} diff --git a/packages/plugins-queue-restate/src/admin.ts b/packages/plugins-queue-restate/src/admin.ts deleted file mode 100644 index dddc8f00..00000000 --- a/packages/plugins-queue-restate/src/admin.ts +++ /dev/null @@ -1,75 +0,0 @@ -import { z } from "zod"; - -export class AdminClient { - constructor(private addr: string) {} - - async upsertDeployment(deploymentAddr: string) { - const res = await fetch(`${this.addr}/deployments`, { - method: "POST", - body: JSON.stringify({ - uri: deploymentAddr, - force: true, - }), - headers: { - "Content-Type": "application/json", - }, - }); - - if (!res.ok) { - throw new Error(`Failed to upsert deployment: ${res.status}`); - } - } - - async getStats(serviceName: string) { - const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`; - const res = await fetch(`${this.addr}/query`, { - method: "POST", - body: JSON.stringify({ - query, - }), - headers: { - "Content-Type": "application/json", - Accept: "application/json", - }, - }); - - if (!res.ok) { - throw new Error(`Failed to get stats: ${res.status}`); - } - const zStatus = z.enum([ - "pending", - "scheduled", - "ready", - "running", - "paused", - "backing-off", - "suspended", - "completed", - ]); - const zSchema = z.object({ - rows: z.array( - z.object({ - status: zStatus, - count: z.number(), - }), - ), - }); - - return zSchema.parse(await res.json()).rows.reduce( - (acc, cur) => { - acc[cur.status] = cur.count; - return acc; - }, - { - pending: 0, - scheduled: 0, - ready: 0, - running: 0, - paused: 0, - "backing-off": 0, - suspended: 0, - completed: 0, - } as Record, number>, - ); - } -} diff --git a/packages/plugins-queue-restate/src/env.ts b/packages/plugins-queue-restate/src/env.ts deleted file mode 100644 index 01175e86..00000000 --- a/packages/plugins-queue-restate/src/env.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { z } from "zod"; - -export const envConfig = z - .object({ - RESTATE_LISTEN_PORT: z.coerce.number().optional(), - RESTATE_INGRESS_ADDR: z - .string() - .optional() - .default("http://localhost:8080"), - RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"), - RESTATE_PUB_KEY: z.string().optional(), - }) - .parse(process.env); diff --git a/packages/plugins-queue-restate/src/idProvider.ts b/packages/plugins-queue-restate/src/idProvider.ts deleted file mode 100644 index ee85f46f..00000000 --- a/packages/plugins-queue-restate/src/idProvider.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; - -export const idProvider = object({ - name: "IdProvider", - handlers: { - get: async (ctx: ObjectContext<{ nextId: number }>): Promise => { - const state = (await ctx.get("nextId")) ?? 0; - ctx.set("nextId", state + 1); - return state; - }, - }, - options: { - ingressPrivate: true, - }, -}); - -export async function genId(ctx: Context) { - return ctx - .objectClient({ name: "IdProvider" }, "IdProvider") - .get(); -} diff --git a/packages/plugins-queue-restate/src/index.ts b/packages/plugins-queue-restate/src/index.ts deleted file mode 100644 index bedc26af..00000000 --- a/packages/plugins-queue-restate/src/index.ts +++ /dev/null @@ -1,201 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; -import * as restateClient from "@restatedev/restate-sdk-clients"; - -import type { PluginProvider } from "@karakeep/shared/plugins"; -import type { - EnqueueOptions, - Queue, - QueueClient, - QueueOptions, - Runner, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import logger from "@karakeep/shared/logger"; - -import { AdminClient } from "./admin"; -import { envConfig } from "./env"; -import { idProvider } from "./idProvider"; -import { semaphore } from "./semaphore"; -import { buildRestateService } from "./service"; - -class RestateQueueWrapper implements Queue { - constructor( - private readonly _name: string, - private readonly client: restateClient.Ingress, - private readonly adminClient: AdminClient, - public readonly opts: QueueOptions, - ) {} - - name(): string { - return this._name; - } - - async enqueue( - payload: T, - options?: EnqueueOptions, - ): Promise { - interface MyService { - run: ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => Promise; - } - const cl = this.client.serviceSendClient({ name: this.name() }); - const res = await cl.run( - { - payload, - priority: options?.priority ?? 0, - }, - restateClient.rpc.sendOpts({ - delay: options?.delayMs - ? { - milliseconds: options.delayMs, - } - : undefined, - idempotencyKey: options?.idempotencyKey, - }), - ); - return res.invocationId; - } - - async stats(): Promise<{ - pending: number; - pending_retry: number; - running: number; - failed: number; - }> { - const res = await this.adminClient.getStats(this.name()); - return { - pending: res.pending + res.ready, - pending_retry: res["backing-off"] + res.paused + res.suspended, - running: res.running, - failed: 0, - }; - } - - async cancelAllNonRunning(): Promise { - throw new Error("Method not implemented."); - } -} - -class RestateRunnerWrapper implements Runner { - constructor( - private readonly wf: restate.ServiceDefinition< - string, - { - run: (ctx: restate.Context, data: T) => Promise; - } - >, - ) {} - - async run(): Promise { - // No-op for restate - } - - async stop(): Promise { - // No-op for restate - } - - async runUntilEmpty(): Promise { - throw new Error("Method not implemented."); - } - - get def(): restate.WorkflowDefinition { - return this.wf; - } -} - -class RestateQueueClient implements QueueClient { - private client: restateClient.Ingress; - private adminClient: AdminClient; - private queues = new Map>(); - private services = new Map>(); - - constructor() { - this.client = restateClient.connect({ - url: envConfig.RESTATE_INGRESS_ADDR, - }); - this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); - } - - async prepare(): Promise { - // No-op for restate - } - - async start(): Promise { - const port = await restate.serve({ - port: envConfig.RESTATE_LISTEN_PORT ?? 0, - services: [ - ...[...this.services.values()].map((svc) => svc.def), - semaphore, - idProvider, - ], - identityKeys: envConfig.RESTATE_PUB_KEY - ? [envConfig.RESTATE_PUB_KEY] - : undefined, - logger: (meta, msg) => { - if (meta.context) { - // No need to log invocation logs - } else { - logger.log(meta.level, `[restate] ${msg}`); - } - }, - }); - logger.info(`Restate listening on port ${port}`); - } - - createQueue(name: string, opts: QueueOptions): Queue { - if (this.queues.has(name)) { - throw new Error(`Queue ${name} already exists`); - } - const wrapper = new RestateQueueWrapper( - name, - this.client, - this.adminClient, - opts, - ); - this.queues.set(name, wrapper); - return wrapper; - } - - createRunner( - queue: Queue, - funcs: RunnerFuncs, - opts: RunnerOptions, - ): Runner { - const name = queue.name(); - let wrapper = this.services.get(name); - if (wrapper) { - throw new Error(`Queue ${name} already exists`); - } - const svc = new RestateRunnerWrapper( - buildRestateService(queue, funcs, opts, queue.opts), - ); - this.services.set(name, svc); - return svc; - } - - async shutdown(): Promise { - // No-op for sqlite - } -} - -export class RestateQueueProvider implements PluginProvider { - private client: QueueClient | null = null; - - static isConfigured(): boolean { - return envConfig.RESTATE_LISTEN_PORT !== undefined; - } - - async getClient(): Promise { - if (!this.client) { - const client = new RestateQueueClient(); - this.client = client; - } - return this.client; - } -} diff --git a/packages/plugins-queue-restate/src/semaphore.ts b/packages/plugins-queue-restate/src/semaphore.ts deleted file mode 100644 index ad636f98..00000000 --- a/packages/plugins-queue-restate/src/semaphore.ts +++ /dev/null @@ -1,109 +0,0 @@ -// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts - -import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; - -interface QueueItem { - awakeable: string; - priority: number; -} - -interface QueueState { - items: QueueItem[]; - inFlight: number; -} - -export const semaphore = object({ - name: "Semaphore", - handlers: { - acquire: async ( - ctx: ObjectContext, - req: { awakeableId: string; priority: number; capacity: number }, - ): Promise => { - const state = await getState(ctx); - - state.items.push({ - awakeable: req.awakeableId, - priority: req.priority, - }); - - tick(ctx, state, req.capacity); - - setState(ctx, state); - }, - - release: async ( - ctx: ObjectContext, - capacity: number, - ): Promise => { - const state = await getState(ctx); - state.inFlight--; - tick(ctx, state, capacity); - setState(ctx, state); - }, - }, - options: { - ingressPrivate: true, - }, -}); - -// Lower numbers represent higher priority, mirroring Liteque’s semantics. -function selectAndPopItem(items: QueueItem[]): QueueItem { - let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; - for (const [i, item] of items.entries()) { - if (item.priority < selected.priority) { - selected.priority = item.priority; - selected.index = i; - } - } - const [item] = items.splice(selected.index, 1); - return item; -} - -function tick( - ctx: ObjectContext, - state: QueueState, - capacity: number, -) { - while (state.inFlight < capacity && state.items.length > 0) { - const item = selectAndPopItem(state.items); - state.inFlight++; - ctx.resolveAwakeable(item.awakeable); - } -} - -async function getState(ctx: ObjectContext): Promise { - return { - items: (await ctx.get("items")) ?? [], - inFlight: (await ctx.get("inFlight")) ?? 0, - }; -} - -function setState(ctx: ObjectContext, state: QueueState) { - ctx.set("items", state.items); - ctx.set("inFlight", state.inFlight); -} - -export class RestateSemaphore { - constructor( - private readonly ctx: Context, - private readonly id: string, - private readonly capacity: number, - ) {} - - async acquire(priority: number) { - const awk = this.ctx.awakeable(); - await this.ctx - .objectClient({ name: "Semaphore" }, this.id) - .acquire({ - awakeableId: awk.id, - priority, - capacity: this.capacity, - }); - await awk.promise; - } - async release() { - await this.ctx - .objectClient({ name: "Semaphore" }, this.id) - .release(this.capacity); - } -} diff --git a/packages/plugins-queue-restate/src/service.ts b/packages/plugins-queue-restate/src/service.ts deleted file mode 100644 index de5b070f..00000000 --- a/packages/plugins-queue-restate/src/service.ts +++ /dev/null @@ -1,133 +0,0 @@ -import * as restate from "@restatedev/restate-sdk"; - -import type { - Queue, - QueueOptions, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import { tryCatch } from "@karakeep/shared/tryCatch"; - -import { genId } from "./idProvider"; -import { RestateSemaphore } from "./semaphore"; - -export function buildRestateService( - queue: Queue, - funcs: RunnerFuncs, - opts: RunnerOptions, - queueOpts: QueueOptions, -) { - const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; - return restate.service({ - name: queue.name(), - options: { - inactivityTimeout: { - seconds: opts.timeoutSecs, - }, - }, - handlers: { - run: async ( - ctx: restate.Context, - data: { - payload: T; - priority: number; - }, - ) => { - const id = `${await genId(ctx)}`; - let payload = data.payload; - if (opts.validator) { - const res = opts.validator.safeParse(data.payload); - if (!res.success) { - throw new restate.TerminalError(res.error.message, { - errorCode: 400, - }); - } - payload = res.data; - } - - const priority = data.priority ?? 0; - - const semaphore = new RestateSemaphore( - ctx, - `queue:${queue.name()}`, - opts.concurrency, - ); - - let lastError: Error | undefined; - for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { - await semaphore.acquire(priority); - const res = await runWorkerLogic(ctx, funcs, { - id, - data: payload, - priority, - runNumber, - numRetriesLeft: NUM_RETRIES - runNumber, - abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), - }); - await semaphore.release(); - if (res.error) { - lastError = res.error; - // TODO: add backoff - await ctx.sleep(1000); - } else { - break; - } - } - if (lastError) { - throw new restate.TerminalError(lastError.message, { - errorCode: 500, - cause: "cause" in lastError ? lastError.cause : undefined, - }); - } - }, - }, - }); -} - -async function runWorkerLogic( - ctx: restate.Context, - { run, onError, onComplete }: RunnerFuncs, - data: { - id: string; - data: T; - priority: number; - runNumber: number; - numRetriesLeft: number; - abortSignal: AbortSignal; - }, -) { - const res = await tryCatch( - ctx.run( - `main logic`, - async () => { - await run(data); - }, - { - maxRetryAttempts: 1, - }, - ), - ); - if (res.error) { - await tryCatch( - ctx.run( - `onError`, - async () => - onError?.({ - ...data, - error: res.error, - }), - { - maxRetryAttempts: 1, - }, - ), - ); - return res; - } - - await tryCatch( - ctx.run("onComplete", async () => await onComplete?.(data), { - maxRetryAttempts: 1, - }), - ); - return res; -} diff --git a/packages/plugins-queue-restate/src/tests/docker-compose.yml b/packages/plugins-queue-restate/src/tests/docker-compose.yml deleted file mode 100644 index f24c2921..00000000 --- a/packages/plugins-queue-restate/src/tests/docker-compose.yml +++ /dev/null @@ -1,8 +0,0 @@ -services: - restate: - image: docker.restate.dev/restatedev/restate:1.5 - ports: - - "${RESTATE_INGRESS_PORT:-8080}:8080" - - "${RESTATE_ADMIN_PORT:-9070}:9070" - extra_hosts: - - "host.docker.internal:host-gateway" diff --git a/packages/plugins-queue-restate/src/tests/queue.test.ts b/packages/plugins-queue-restate/src/tests/queue.test.ts deleted file mode 100644 index e59d47cb..00000000 --- a/packages/plugins-queue-restate/src/tests/queue.test.ts +++ /dev/null @@ -1,221 +0,0 @@ -import { - afterAll, - afterEach, - beforeAll, - beforeEach, - describe, - expect, - inject, - it, -} from "vitest"; - -import type { Queue, QueueClient } from "@karakeep/shared/queueing"; - -import { AdminClient } from "../admin.js"; -import { RestateQueueProvider } from "../index.js"; -import { waitUntil } from "./utils.js"; - -type TestAction = - | { type: "val"; val: number } - | { type: "err"; err: string } - | { type: "stall"; durSec: number }; - -describe("Restate Queue Provider", () => { - let queueClient: QueueClient; - let queue: Queue; - let adminClient: AdminClient; - - const testState = { - results: [] as number[], - errors: [] as string[], - inFlight: 0, - maxInFlight: 0, - }; - - async function waitUntilQueueEmpty() { - await waitUntil( - async () => { - const stats = await queue.stats(); - return stats.pending + stats.pending_retry + stats.running === 0; - }, - "Queue to be empty", - 60000, - ); - } - - beforeEach(async () => { - testState.results = []; - testState.errors = []; - testState.inFlight = 0; - testState.maxInFlight = 0; - }); - afterEach(async () => { - await waitUntilQueueEmpty(); - }); - - beforeAll(async () => { - const ingressPort = inject("restateIngressPort"); - const adminPort = inject("restateAdminPort"); - - process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; - process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; - process.env.RESTATE_LISTEN_PORT = "9080"; - - const provider = new RestateQueueProvider(); - const client = await provider.getClient(); - - if (!client) { - throw new Error("Failed to create queue client"); - } - - queueClient = client; - adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); - - queue = queueClient.createQueue("test-queue", { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, - }); - - queueClient.createRunner( - queue, - { - run: async (job) => { - testState.inFlight++; - testState.maxInFlight = Math.max( - testState.maxInFlight, - testState.inFlight, - ); - const jobData = job.data; - switch (jobData.type) { - case "val": - testState.results.push(jobData.val); - break; - case "err": - throw new Error(jobData.err); - case "stall": - await new Promise((resolve) => - setTimeout(resolve, jobData.durSec * 1000), - ); - break; - } - }, - onError: async (job) => { - testState.inFlight--; - const jobData = job.data; - if (jobData && jobData.type === "err") { - testState.errors.push(jobData.err); - } - }, - onComplete: async () => { - testState.inFlight--; - }, - }, - { - concurrency: 3, - timeoutSecs: 2, - pollIntervalMs: 0 /* Doesn't matter */, - }, - ); - - await queueClient.prepare(); - await queueClient.start(); - - await adminClient.upsertDeployment("http://host.docker.internal:9080"); - }, 90000); - - afterAll(async () => { - if (queueClient?.shutdown) { - await queueClient.shutdown(); - } - }); - - it("should enqueue and process a job", async () => { - const jobId = await queue.enqueue({ type: "val", val: 42 }); - - expect(jobId).toBeDefined(); - expect(typeof jobId).toBe("string"); - - await waitUntilQueueEmpty(); - - expect(testState.results).toEqual([42]); - }, 60000); - - it("should process multiple jobs", async () => { - await queue.enqueue({ type: "val", val: 1 }); - await queue.enqueue({ type: "val", val: 2 }); - await queue.enqueue({ type: "val", val: 3 }); - - await waitUntilQueueEmpty(); - - expect(testState.results.length).toEqual(3); - expect(testState.results).toContain(1); - expect(testState.results).toContain(2); - expect(testState.results).toContain(3); - }, 60000); - - it("should retry failed jobs", async () => { - await queue.enqueue({ type: "err", err: "Test error" }); - - await waitUntilQueueEmpty(); - - // Initial attempt + 3 retries - expect(testState.errors).toEqual([ - "Test error", - "Test error", - "Test error", - "Test error", - ]); - }, 90000); - - it("should use idempotency key", async () => { - const idempotencyKey = `test-${Date.now()}`; - - await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); - await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); - - await waitUntilQueueEmpty(); - - expect(testState.results).toEqual([200]); - }, 60000); - - it("should handle concurrent jobs", async () => { - const promises = []; - for (let i = 300; i < 320; i++) { - promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); - } - await Promise.all(promises); - - await waitUntilQueueEmpty(); - - expect(testState.maxInFlight).toEqual(3); - }, 60000); - - it("should handle priorities", async () => { - // Hog the queue first - await Promise.all([ - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), - queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), - ]); - - // Then those will get reprioritized - await Promise.all([ - queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), - queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), - queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), - - queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), - queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), - queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), - ]); - - await waitUntilQueueEmpty(); - - expect(testState.results).toEqual([ - // Lower numeric priority value should run first - 202, 201, 200, 300, 301, 302, - ]); - }, 60000); -}); diff --git a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts b/packages/plugins-queue-restate/src/tests/setup/startContainers.ts deleted file mode 100644 index 7d9dea5c..00000000 --- a/packages/plugins-queue-restate/src/tests/setup/startContainers.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { execSync } from "child_process"; -import net from "net"; -import path from "path"; -import { fileURLToPath } from "url"; -import type { GlobalSetupContext } from "vitest/node"; - -import { waitUntil } from "../utils.js"; - -async function getRandomPort(): Promise { - const server = net.createServer(); - return new Promise((resolve, reject) => { - server.unref(); - server.on("error", reject); - server.listen(0, () => { - const port = (server.address() as net.AddressInfo).port; - server.close(() => resolve(port)); - }); - }); -} - -async function waitForHealthy( - ingressPort: number, - adminPort: number, - timeout = 60000, -): Promise { - await waitUntil( - async () => { - const response = await fetch(`http://localhost:${adminPort}/health`); - return response.ok; - }, - "Restate admin API is healthy", - timeout, - ); - - await waitUntil( - async () => { - const response = await fetch( - `http://localhost:${ingressPort}/restate/health`, - ); - return response.ok; - }, - "Restate ingress is healthy", - timeout, - ); -} - -export default async function ({ provide }: GlobalSetupContext) { - const __dirname = path.dirname(fileURLToPath(import.meta.url)); - const ingressPort = await getRandomPort(); - const adminPort = await getRandomPort(); - - console.log( - `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`, - ); - execSync(`docker compose up -d`, { - cwd: path.join(__dirname, ".."), - stdio: "ignore", - env: { - ...process.env, - RESTATE_INGRESS_PORT: ingressPort.toString(), - RESTATE_ADMIN_PORT: adminPort.toString(), - }, - }); - - console.log("Waiting for Restate to become healthy..."); - await waitForHealthy(ingressPort, adminPort); - - provide("restateIngressPort", ingressPort); - provide("restateAdminPort", adminPort); - - process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; - process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; - process.env.RESTATE_LISTEN_PORT = "9080"; - - return async () => { - console.log("Stopping Restate..."); - execSync("docker compose down", { - cwd: path.join(__dirname, ".."), - stdio: "ignore", - }); - return Promise.resolve(); - }; -} - -declare module "vitest" { - export interface ProvidedContext { - restateIngressPort: number; - restateAdminPort: number; - } -} diff --git a/packages/plugins-queue-restate/src/tests/utils.ts b/packages/plugins-queue-restate/src/tests/utils.ts deleted file mode 100644 index e02d2dee..00000000 --- a/packages/plugins-queue-restate/src/tests/utils.ts +++ /dev/null @@ -1,23 +0,0 @@ -export async function waitUntil( - f: () => Promise, - description: string, - timeoutMs = 60000, -): Promise { - const startTime = Date.now(); - - while (Date.now() - startTime < timeoutMs) { - console.log(`Waiting for ${description}...`); - try { - const res = await f(); - if (res) { - console.log(`${description}: success`); - return; - } - } catch (error) { - console.log(`${description}: error, retrying...: ${error}`); - } - await new Promise((resolve) => setTimeout(resolve, 1000)); - } - - throw new Error(`${description}: timeout after ${timeoutMs}ms`); -} diff --git a/packages/plugins-queue-restate/tsconfig.json b/packages/plugins-queue-restate/tsconfig.json deleted file mode 100644 index 3bfa695c..00000000 --- a/packages/plugins-queue-restate/tsconfig.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/tsconfig", - "extends": "@karakeep/tsconfig/node.json", - "include": ["**/*.ts"], - "exclude": ["node_modules"], - "compilerOptions": { - "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" - } -} - diff --git a/packages/plugins-queue-restate/vitest.config.ts b/packages/plugins-queue-restate/vitest.config.ts deleted file mode 100644 index 73e0e1b9..00000000 --- a/packages/plugins-queue-restate/vitest.config.ts +++ /dev/null @@ -1,14 +0,0 @@ -/// - -import tsconfigPaths from "vite-tsconfig-paths"; -import { defineConfig } from "vitest/config"; - -export default defineConfig({ - plugins: [tsconfigPaths()], - test: { - globalSetup: ["./src/tests/setup/startContainers.ts"], - teardownTimeout: 30000, - include: ["src/tests/**/*.test.ts"], - testTimeout: 60000, - }, -}); diff --git a/packages/plugins-search-meilisearch/.oxlintrc.json b/packages/plugins-search-meilisearch/.oxlintrc.json deleted file mode 100644 index 79ba0255..00000000 --- a/packages/plugins-search-meilisearch/.oxlintrc.json +++ /dev/null @@ -1,19 +0,0 @@ -{ - "$schema": "../../node_modules/oxlint/configuration_schema.json", - "extends": [ - "../../tooling/oxlint/oxlint-base.json" - ], - "env": { - "builtin": true, - "commonjs": true - }, - "ignorePatterns": [ - "**/*.config.js", - "**/*.config.cjs", - "**/.eslintrc.cjs", - "**/.next", - "**/dist", - "**/build", - "**/pnpm-lock.yaml" - ] -} diff --git a/packages/plugins-search-meilisearch/index.ts b/packages/plugins-search-meilisearch/index.ts deleted file mode 100644 index 3496d52f..00000000 --- a/packages/plugins-search-meilisearch/index.ts +++ /dev/null @@ -1,12 +0,0 @@ -// Auto-register the MeiliSearch provider when this package is imported -import { PluginManager, PluginType } from "@karakeep/shared/plugins"; - -import { MeiliSearchProvider } from "./src"; - -if (MeiliSearchProvider.isConfigured()) { - PluginManager.register({ - type: PluginType.Search, - name: "MeiliSearch", - provider: new MeiliSearchProvider(), - }); -} diff --git a/packages/plugins-search-meilisearch/package.json b/packages/plugins-search-meilisearch/package.json deleted file mode 100644 index c9482731..00000000 --- a/packages/plugins-search-meilisearch/package.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@karakeep/plugins-search-meilisearch", - "version": "0.1.0", - "private": true, - "type": "module", - "scripts": { - "typecheck": "tsc --noEmit", - "format": "prettier . --cache --ignore-path ../../.prettierignore --check", - "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", - "lint": "oxlint .", - "lint:fix": "oxlint . --fix", - "test": "vitest" - }, - "dependencies": { - "@karakeep/shared": "workspace:*", - "meilisearch": "^0.45.0" - }, - "devDependencies": { - "@karakeep/prettier-config": "workspace:^0.1.0", - "@karakeep/tsconfig": "workspace:^0.1.0", - "vite-tsconfig-paths": "^4.3.1", - "vitest": "^3.2.4" - }, - "prettier": "@karakeep/prettier-config" -} diff --git a/packages/plugins-search-meilisearch/src/env.ts b/packages/plugins-search-meilisearch/src/env.ts deleted file mode 100644 index c06fdd55..00000000 --- a/packages/plugins-search-meilisearch/src/env.ts +++ /dev/null @@ -1,8 +0,0 @@ -import { z } from "zod"; - -export const envConfig = z - .object({ - MEILI_ADDR: z.string().optional(), - MEILI_MASTER_KEY: z.string().default(""), - }) - .parse(process.env); diff --git a/packages/plugins-search-meilisearch/src/index.ts b/packages/plugins-search-meilisearch/src/index.ts deleted file mode 100644 index 30da4a64..00000000 --- a/packages/plugins-search-meilisearch/src/index.ts +++ /dev/null @@ -1,159 +0,0 @@ -import type { Index } from "meilisearch"; -import { MeiliSearch } from "meilisearch"; - -import type { - BookmarkSearchDocument, - FilterQuery, - SearchIndexClient, - SearchOptions, - SearchResponse, -} from "@karakeep/shared/search"; -import serverConfig from "@karakeep/shared/config"; -import { PluginProvider } from "@karakeep/shared/plugins"; - -import { envConfig } from "./env"; - -function filterToMeiliSearchFilter(filter: FilterQuery): string { - switch (filter.type) { - case "eq": - return `${filter.field} = "${filter.value}"`; - case "in": - return `${filter.field} IN [${filter.values.join(",")}]`; - default: { - const exhaustiveCheck: never = filter; - throw new Error(`Unhandled color case: ${exhaustiveCheck}`); - } - } -} - -class MeiliSearchIndexClient implements SearchIndexClient { - constructor(private index: Index) {} - - async addDocuments(documents: BookmarkSearchDocument[]): Promise { - const task = await this.index.addDocuments(documents, { - primaryKey: "id", - }); - await this.ensureTaskSuccess(task.taskUid); - } - - async deleteDocuments(ids: string[]): Promise { - const task = await this.index.deleteDocuments(ids); - await this.ensureTaskSuccess(task.taskUid); - } - - async search(options: SearchOptions): Promise { - const result = await this.index.search(options.query, { - filter: options.filter?.map((f) => filterToMeiliSearchFilter(f)), - limit: options.limit, - offset: options.offset, - sort: options.sort?.map((s) => `${s.field}:${s.order}`), - attributesToRetrieve: ["id"], - showRankingScore: true, - }); - - return { - hits: result.hits.map((hit) => ({ - id: hit.id, - score: hit._rankingScore, - })), - totalHits: result.estimatedTotalHits ?? 0, - processingTimeMs: result.processingTimeMs, - }; - } - - async clearIndex(): Promise { - const task = await this.index.deleteAllDocuments(); - await this.ensureTaskSuccess(task.taskUid); - } - - private async ensureTaskSuccess(taskUid: number): Promise { - const task = await this.index.waitForTask(taskUid, { - intervalMs: 200, - timeOutMs: serverConfig.search.jobTimeoutSec * 1000 * 0.9, - }); - if (task.error) { - throw new Error(`Search task failed: ${task.error.message}`); - } - } -} - -export class MeiliSearchProvider implements PluginProvider { - private client: MeiliSearch | undefined; - private indexClient: SearchIndexClient | undefined; - private readonly indexName = "bookmarks"; - - constructor() { - if (MeiliSearchProvider.isConfigured()) { - this.client = new MeiliSearch({ - host: envConfig.MEILI_ADDR!, - apiKey: envConfig.MEILI_MASTER_KEY, - }); - } - } - - static isConfigured(): boolean { - return !!envConfig.MEILI_ADDR; - } - - async getClient(): Promise { - if (this.indexClient) { - return this.indexClient; - } - - if (!this.client) { - return null; - } - - const indices = await this.client.getIndexes(); - let indexFound = indices.results.find((i) => i.uid === this.indexName); - - if (!indexFound) { - const idx = await this.client.createIndex(this.indexName, { - primaryKey: "id", - }); - await this.client.waitForTask(idx.taskUid); - indexFound = await this.client.getIndex( - this.indexName, - ); - } - - await this.configureIndex(indexFound); - this.indexClient = new MeiliSearchIndexClient(indexFound); - return this.indexClient; - } - - private async configureIndex( - index: Index, - ): Promise { - const desiredFilterableAttributes = ["id", "userId"].sort(); - const desiredSortableAttributes = ["createdAt"].sort(); - - const settings = await index.getSettings(); - - if ( - JSON.stringify(settings.filterableAttributes?.sort()) !== - JSON.stringify(desiredFilterableAttributes) - ) { - console.log( - `[meilisearch] Updating desired filterable attributes to ${desiredFilterableAttributes} from ${settings.filterableAttributes}`, - ); - const taskId = await index.updateFilterableAttributes( - desiredFilterableAttributes, - ); - await this.client!.waitForTask(taskId.taskUid); - } - - if ( - JSON.stringify(settings.sortableAttributes?.sort()) !== - JSON.stringify(desiredSortableAttributes) - ) { - console.log( - `[meilisearch] Updating desired sortable attributes to ${desiredSortableAttributes} from ${settings.sortableAttributes}`, - ); - const taskId = await index.updateSortableAttributes( - desiredSortableAttributes, - ); - await this.client!.waitForTask(taskId.taskUid); - } - } -} diff --git a/packages/plugins-search-meilisearch/tsconfig.json b/packages/plugins-search-meilisearch/tsconfig.json deleted file mode 100644 index a795b96a..00000000 --- a/packages/plugins-search-meilisearch/tsconfig.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/tsconfig", - "extends": "@karakeep/tsconfig/node.json", - "include": ["**/*.ts"], - "exclude": ["node_modules"], - "compilerOptions": { - "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" - } -} diff --git a/packages/plugins/.oxlintrc.json b/packages/plugins/.oxlintrc.json new file mode 100644 index 00000000..79ba0255 --- /dev/null +++ b/packages/plugins/.oxlintrc.json @@ -0,0 +1,19 @@ +{ + "$schema": "../../node_modules/oxlint/configuration_schema.json", + "extends": [ + "../../tooling/oxlint/oxlint-base.json" + ], + "env": { + "builtin": true, + "commonjs": true + }, + "ignorePatterns": [ + "**/*.config.js", + "**/*.config.cjs", + "**/.eslintrc.cjs", + "**/.next", + "**/dist", + "**/build", + "**/pnpm-lock.yaml" + ] +} diff --git a/packages/plugins/package.json b/packages/plugins/package.json new file mode 100644 index 00000000..8b3f73f7 --- /dev/null +++ b/packages/plugins/package.json @@ -0,0 +1,34 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@karakeep/plugins", + "version": "0.1.0", + "private": true, + "type": "module", + "exports": { + "./queue-liteque": "./queue-liteque/index.ts", + "./queue-restate": "./queue-restate/index.ts", + "./search-meilisearch": "./search-meilisearch/index.ts" + }, + "scripts": { + "typecheck": "tsc --noEmit", + "format": "prettier . --cache --ignore-path ../../.prettierignore --check", + "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", + "lint": "oxlint .", + "lint:fix": "oxlint . --fix", + "test": "vitest" + }, + "dependencies": { + "@karakeep/shared": "workspace:*", + "@restatedev/restate-sdk": "^1.9.0", + "@restatedev/restate-sdk-clients": "^1.9.0", + "liteque": "^0.6.2", + "meilisearch": "^0.45.0" + }, + "devDependencies": { + "@karakeep/prettier-config": "workspace:^0.1.0", + "@karakeep/tsconfig": "workspace:^0.1.0", + "vite-tsconfig-paths": "^4.3.1", + "vitest": "^3.2.4" + }, + "prettier": "@karakeep/prettier-config" +} diff --git a/packages/plugins/queue-liteque/index.ts b/packages/plugins/queue-liteque/index.ts new file mode 100644 index 00000000..c3f7f03b --- /dev/null +++ b/packages/plugins/queue-liteque/index.ts @@ -0,0 +1,10 @@ +// Auto-register the Liteque queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { LitequeQueueProvider } from "./src"; + +PluginManager.register({ + type: PluginType.Queue, + name: "Liteque", + provider: new LitequeQueueProvider(), +}); diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts new file mode 100644 index 00000000..ddc2181c --- /dev/null +++ b/packages/plugins/queue-liteque/src/index.ts @@ -0,0 +1,137 @@ +import path from "node:path"; +import { + buildDBClient, + SqliteQueue as LQ, + Runner as LQRunner, + migrateDB, +} from "liteque"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + DequeuedJob, + DequeuedJobError, + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import serverConfig from "@karakeep/shared/config"; + +class LitequeQueueWrapper implements Queue { + constructor( + private readonly _name: string, + private readonly lq: LQ, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise { + const job = await this.lq.enqueue(payload, options); + // liteque returns a Job with numeric id + return job ? String(job.id) : undefined; + } + + async stats() { + return this.lq.stats(); + } + + async cancelAllNonRunning(): Promise { + return this.lq.cancelAllNonRunning(); + } + + // Internal accessor for runner + get _impl(): LQ { + return this.lq; + } +} + +class LitequeQueueClient implements QueueClient { + private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { + walEnabled: serverConfig.database.walMode, + }); + + private queues = new Map>(); + + async prepare(): Promise { + migrateDB(this.db); + } + + async start(): Promise { + // No-op for sqlite + } + + createQueue(name: string, options: QueueOptions): Queue { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const lq = new LQ(name, this.db, { + defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, + keepFailedJobs: options.keepFailedJobs, + }); + const wrapper = new LitequeQueueWrapper(name, lq, options); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner { + const name = queue.name(); + let wrapper = this.queues.get(name); + if (!wrapper) { + throw new Error(`Queue ${name} not found`); + } + + const runner = new LQRunner( + wrapper._impl, + { + run: funcs.run, + onComplete: funcs.onComplete as + | ((job: DequeuedJob) => Promise) + | undefined, + onError: funcs.onError as + | ((job: DequeuedJobError) => Promise) + | undefined, + }, + { + pollIntervalMs: opts.pollIntervalMs ?? 1000, + timeoutSecs: opts.timeoutSecs, + concurrency: opts.concurrency, + validator: opts.validator, + }, + ); + + return { + run: () => runner.run(), + stop: () => runner.stop(), + runUntilEmpty: () => runner.runUntilEmpty(), + }; + } + + async shutdown(): Promise { + // No-op for sqlite + } +} + +export class LitequeQueueProvider implements PluginProvider { + private client: QueueClient | null = null; + + async getClient(): Promise { + if (!this.client) { + const client = new LitequeQueueClient(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins/queue-restate/index.ts b/packages/plugins/queue-restate/index.ts new file mode 100644 index 00000000..d313615c --- /dev/null +++ b/packages/plugins/queue-restate/index.ts @@ -0,0 +1,12 @@ +// Auto-register the Restate queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { RestateQueueProvider } from "./src"; + +if (RestateQueueProvider.isConfigured()) { + PluginManager.register({ + type: PluginType.Queue, + name: "Restate", + provider: new RestateQueueProvider(), + }); +} diff --git a/packages/plugins/queue-restate/src/admin.ts b/packages/plugins/queue-restate/src/admin.ts new file mode 100644 index 00000000..dddc8f00 --- /dev/null +++ b/packages/plugins/queue-restate/src/admin.ts @@ -0,0 +1,75 @@ +import { z } from "zod"; + +export class AdminClient { + constructor(private addr: string) {} + + async upsertDeployment(deploymentAddr: string) { + const res = await fetch(`${this.addr}/deployments`, { + method: "POST", + body: JSON.stringify({ + uri: deploymentAddr, + force: true, + }), + headers: { + "Content-Type": "application/json", + }, + }); + + if (!res.ok) { + throw new Error(`Failed to upsert deployment: ${res.status}`); + } + } + + async getStats(serviceName: string) { + const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`; + const res = await fetch(`${this.addr}/query`, { + method: "POST", + body: JSON.stringify({ + query, + }), + headers: { + "Content-Type": "application/json", + Accept: "application/json", + }, + }); + + if (!res.ok) { + throw new Error(`Failed to get stats: ${res.status}`); + } + const zStatus = z.enum([ + "pending", + "scheduled", + "ready", + "running", + "paused", + "backing-off", + "suspended", + "completed", + ]); + const zSchema = z.object({ + rows: z.array( + z.object({ + status: zStatus, + count: z.number(), + }), + ), + }); + + return zSchema.parse(await res.json()).rows.reduce( + (acc, cur) => { + acc[cur.status] = cur.count; + return acc; + }, + { + pending: 0, + scheduled: 0, + ready: 0, + running: 0, + paused: 0, + "backing-off": 0, + suspended: 0, + completed: 0, + } as Record, number>, + ); + } +} diff --git a/packages/plugins/queue-restate/src/env.ts b/packages/plugins/queue-restate/src/env.ts new file mode 100644 index 00000000..01175e86 --- /dev/null +++ b/packages/plugins/queue-restate/src/env.ts @@ -0,0 +1,13 @@ +import { z } from "zod"; + +export const envConfig = z + .object({ + RESTATE_LISTEN_PORT: z.coerce.number().optional(), + RESTATE_INGRESS_ADDR: z + .string() + .optional() + .default("http://localhost:8080"), + RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"), + RESTATE_PUB_KEY: z.string().optional(), + }) + .parse(process.env); diff --git a/packages/plugins/queue-restate/src/idProvider.ts b/packages/plugins/queue-restate/src/idProvider.ts new file mode 100644 index 00000000..ee85f46f --- /dev/null +++ b/packages/plugins/queue-restate/src/idProvider.ts @@ -0,0 +1,21 @@ +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +export const idProvider = object({ + name: "IdProvider", + handlers: { + get: async (ctx: ObjectContext<{ nextId: number }>): Promise => { + const state = (await ctx.get("nextId")) ?? 0; + ctx.set("nextId", state + 1); + return state; + }, + }, + options: { + ingressPrivate: true, + }, +}); + +export async function genId(ctx: Context) { + return ctx + .objectClient({ name: "IdProvider" }, "IdProvider") + .get(); +} diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts new file mode 100644 index 00000000..bedc26af --- /dev/null +++ b/packages/plugins/queue-restate/src/index.ts @@ -0,0 +1,201 @@ +import * as restate from "@restatedev/restate-sdk"; +import * as restateClient from "@restatedev/restate-sdk-clients"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; + +import { AdminClient } from "./admin"; +import { envConfig } from "./env"; +import { idProvider } from "./idProvider"; +import { semaphore } from "./semaphore"; +import { buildRestateService } from "./service"; + +class RestateQueueWrapper implements Queue { + constructor( + private readonly _name: string, + private readonly client: restateClient.Ingress, + private readonly adminClient: AdminClient, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise { + interface MyService { + run: ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => Promise; + } + const cl = this.client.serviceSendClient({ name: this.name() }); + const res = await cl.run( + { + payload, + priority: options?.priority ?? 0, + }, + restateClient.rpc.sendOpts({ + delay: options?.delayMs + ? { + milliseconds: options.delayMs, + } + : undefined, + idempotencyKey: options?.idempotencyKey, + }), + ); + return res.invocationId; + } + + async stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }> { + const res = await this.adminClient.getStats(this.name()); + return { + pending: res.pending + res.ready, + pending_retry: res["backing-off"] + res.paused + res.suspended, + running: res.running, + failed: 0, + }; + } + + async cancelAllNonRunning(): Promise { + throw new Error("Method not implemented."); + } +} + +class RestateRunnerWrapper implements Runner { + constructor( + private readonly wf: restate.ServiceDefinition< + string, + { + run: (ctx: restate.Context, data: T) => Promise; + } + >, + ) {} + + async run(): Promise { + // No-op for restate + } + + async stop(): Promise { + // No-op for restate + } + + async runUntilEmpty(): Promise { + throw new Error("Method not implemented."); + } + + get def(): restate.WorkflowDefinition { + return this.wf; + } +} + +class RestateQueueClient implements QueueClient { + private client: restateClient.Ingress; + private adminClient: AdminClient; + private queues = new Map>(); + private services = new Map>(); + + constructor() { + this.client = restateClient.connect({ + url: envConfig.RESTATE_INGRESS_ADDR, + }); + this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); + } + + async prepare(): Promise { + // No-op for restate + } + + async start(): Promise { + const port = await restate.serve({ + port: envConfig.RESTATE_LISTEN_PORT ?? 0, + services: [ + ...[...this.services.values()].map((svc) => svc.def), + semaphore, + idProvider, + ], + identityKeys: envConfig.RESTATE_PUB_KEY + ? [envConfig.RESTATE_PUB_KEY] + : undefined, + logger: (meta, msg) => { + if (meta.context) { + // No need to log invocation logs + } else { + logger.log(meta.level, `[restate] ${msg}`); + } + }, + }); + logger.info(`Restate listening on port ${port}`); + } + + createQueue(name: string, opts: QueueOptions): Queue { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const wrapper = new RestateQueueWrapper( + name, + this.client, + this.adminClient, + opts, + ); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner { + const name = queue.name(); + let wrapper = this.services.get(name); + if (wrapper) { + throw new Error(`Queue ${name} already exists`); + } + const svc = new RestateRunnerWrapper( + buildRestateService(queue, funcs, opts, queue.opts), + ); + this.services.set(name, svc); + return svc; + } + + async shutdown(): Promise { + // No-op for sqlite + } +} + +export class RestateQueueProvider implements PluginProvider { + private client: QueueClient | null = null; + + static isConfigured(): boolean { + return envConfig.RESTATE_LISTEN_PORT !== undefined; + } + + async getClient(): Promise { + if (!this.client) { + const client = new RestateQueueClient(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts new file mode 100644 index 00000000..ad636f98 --- /dev/null +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -0,0 +1,109 @@ +// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts + +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +interface QueueItem { + awakeable: string; + priority: number; +} + +interface QueueState { + items: QueueItem[]; + inFlight: number; +} + +export const semaphore = object({ + name: "Semaphore", + handlers: { + acquire: async ( + ctx: ObjectContext, + req: { awakeableId: string; priority: number; capacity: number }, + ): Promise => { + const state = await getState(ctx); + + state.items.push({ + awakeable: req.awakeableId, + priority: req.priority, + }); + + tick(ctx, state, req.capacity); + + setState(ctx, state); + }, + + release: async ( + ctx: ObjectContext, + capacity: number, + ): Promise => { + const state = await getState(ctx); + state.inFlight--; + tick(ctx, state, capacity); + setState(ctx, state); + }, + }, + options: { + ingressPrivate: true, + }, +}); + +// Lower numbers represent higher priority, mirroring Liteque’s semantics. +function selectAndPopItem(items: QueueItem[]): QueueItem { + let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; + for (const [i, item] of items.entries()) { + if (item.priority < selected.priority) { + selected.priority = item.priority; + selected.index = i; + } + } + const [item] = items.splice(selected.index, 1); + return item; +} + +function tick( + ctx: ObjectContext, + state: QueueState, + capacity: number, +) { + while (state.inFlight < capacity && state.items.length > 0) { + const item = selectAndPopItem(state.items); + state.inFlight++; + ctx.resolveAwakeable(item.awakeable); + } +} + +async function getState(ctx: ObjectContext): Promise { + return { + items: (await ctx.get("items")) ?? [], + inFlight: (await ctx.get("inFlight")) ?? 0, + }; +} + +function setState(ctx: ObjectContext, state: QueueState) { + ctx.set("items", state.items); + ctx.set("inFlight", state.inFlight); +} + +export class RestateSemaphore { + constructor( + private readonly ctx: Context, + private readonly id: string, + private readonly capacity: number, + ) {} + + async acquire(priority: number) { + const awk = this.ctx.awakeable(); + await this.ctx + .objectClient({ name: "Semaphore" }, this.id) + .acquire({ + awakeableId: awk.id, + priority, + capacity: this.capacity, + }); + await awk.promise; + } + async release() { + await this.ctx + .objectClient({ name: "Semaphore" }, this.id) + .release(this.capacity); + } +} diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts new file mode 100644 index 00000000..de5b070f --- /dev/null +++ b/packages/plugins/queue-restate/src/service.ts @@ -0,0 +1,133 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { + Queue, + QueueOptions, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import { genId } from "./idProvider"; +import { RestateSemaphore } from "./semaphore"; + +export function buildRestateService( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + queueOpts: QueueOptions, +) { + const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; + return restate.service({ + name: queue.name(), + options: { + inactivityTimeout: { + seconds: opts.timeoutSecs, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => { + const id = `${await genId(ctx)}`; + let payload = data.payload; + if (opts.validator) { + const res = opts.validator.safeParse(data.payload); + if (!res.success) { + throw new restate.TerminalError(res.error.message, { + errorCode: 400, + }); + } + payload = res.data; + } + + const priority = data.priority ?? 0; + + const semaphore = new RestateSemaphore( + ctx, + `queue:${queue.name()}`, + opts.concurrency, + ); + + let lastError: Error | undefined; + for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + await semaphore.acquire(priority); + const res = await runWorkerLogic(ctx, funcs, { + id, + data: payload, + priority, + runNumber, + numRetriesLeft: NUM_RETRIES - runNumber, + abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), + }); + await semaphore.release(); + if (res.error) { + lastError = res.error; + // TODO: add backoff + await ctx.sleep(1000); + } else { + break; + } + } + if (lastError) { + throw new restate.TerminalError(lastError.message, { + errorCode: 500, + cause: "cause" in lastError ? lastError.cause : undefined, + }); + } + }, + }, + }); +} + +async function runWorkerLogic( + ctx: restate.Context, + { run, onError, onComplete }: RunnerFuncs, + data: { + id: string; + data: T; + priority: number; + runNumber: number; + numRetriesLeft: number; + abortSignal: AbortSignal; + }, +) { + const res = await tryCatch( + ctx.run( + `main logic`, + async () => { + await run(data); + }, + { + maxRetryAttempts: 1, + }, + ), + ); + if (res.error) { + await tryCatch( + ctx.run( + `onError`, + async () => + onError?.({ + ...data, + error: res.error, + }), + { + maxRetryAttempts: 1, + }, + ), + ); + return res; + } + + await tryCatch( + ctx.run("onComplete", async () => await onComplete?.(data), { + maxRetryAttempts: 1, + }), + ); + return res; +} diff --git a/packages/plugins/queue-restate/src/tests/docker-compose.yml b/packages/plugins/queue-restate/src/tests/docker-compose.yml new file mode 100644 index 00000000..f24c2921 --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/docker-compose.yml @@ -0,0 +1,8 @@ +services: + restate: + image: docker.restate.dev/restatedev/restate:1.5 + ports: + - "${RESTATE_INGRESS_PORT:-8080}:8080" + - "${RESTATE_ADMIN_PORT:-9070}:9070" + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts new file mode 100644 index 00000000..e59d47cb --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -0,0 +1,221 @@ +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + inject, + it, +} from "vitest"; + +import type { Queue, QueueClient } from "@karakeep/shared/queueing"; + +import { AdminClient } from "../admin.js"; +import { RestateQueueProvider } from "../index.js"; +import { waitUntil } from "./utils.js"; + +type TestAction = + | { type: "val"; val: number } + | { type: "err"; err: string } + | { type: "stall"; durSec: number }; + +describe("Restate Queue Provider", () => { + let queueClient: QueueClient; + let queue: Queue; + let adminClient: AdminClient; + + const testState = { + results: [] as number[], + errors: [] as string[], + inFlight: 0, + maxInFlight: 0, + }; + + async function waitUntilQueueEmpty() { + await waitUntil( + async () => { + const stats = await queue.stats(); + return stats.pending + stats.pending_retry + stats.running === 0; + }, + "Queue to be empty", + 60000, + ); + } + + beforeEach(async () => { + testState.results = []; + testState.errors = []; + testState.inFlight = 0; + testState.maxInFlight = 0; + }); + afterEach(async () => { + await waitUntilQueueEmpty(); + }); + + beforeAll(async () => { + const ingressPort = inject("restateIngressPort"); + const adminPort = inject("restateAdminPort"); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + const provider = new RestateQueueProvider(); + const client = await provider.getClient(); + + if (!client) { + throw new Error("Failed to create queue client"); + } + + queueClient = client; + adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); + + queue = queueClient.createQueue("test-queue", { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }); + + queueClient.createRunner( + queue, + { + run: async (job) => { + testState.inFlight++; + testState.maxInFlight = Math.max( + testState.maxInFlight, + testState.inFlight, + ); + const jobData = job.data; + switch (jobData.type) { + case "val": + testState.results.push(jobData.val); + break; + case "err": + throw new Error(jobData.err); + case "stall": + await new Promise((resolve) => + setTimeout(resolve, jobData.durSec * 1000), + ); + break; + } + }, + onError: async (job) => { + testState.inFlight--; + const jobData = job.data; + if (jobData && jobData.type === "err") { + testState.errors.push(jobData.err); + } + }, + onComplete: async () => { + testState.inFlight--; + }, + }, + { + concurrency: 3, + timeoutSecs: 2, + pollIntervalMs: 0 /* Doesn't matter */, + }, + ); + + await queueClient.prepare(); + await queueClient.start(); + + await adminClient.upsertDeployment("http://host.docker.internal:9080"); + }, 90000); + + afterAll(async () => { + if (queueClient?.shutdown) { + await queueClient.shutdown(); + } + }); + + it("should enqueue and process a job", async () => { + const jobId = await queue.enqueue({ type: "val", val: 42 }); + + expect(jobId).toBeDefined(); + expect(typeof jobId).toBe("string"); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([42]); + }, 60000); + + it("should process multiple jobs", async () => { + await queue.enqueue({ type: "val", val: 1 }); + await queue.enqueue({ type: "val", val: 2 }); + await queue.enqueue({ type: "val", val: 3 }); + + await waitUntilQueueEmpty(); + + expect(testState.results.length).toEqual(3); + expect(testState.results).toContain(1); + expect(testState.results).toContain(2); + expect(testState.results).toContain(3); + }, 60000); + + it("should retry failed jobs", async () => { + await queue.enqueue({ type: "err", err: "Test error" }); + + await waitUntilQueueEmpty(); + + // Initial attempt + 3 retries + expect(testState.errors).toEqual([ + "Test error", + "Test error", + "Test error", + "Test error", + ]); + }, 90000); + + it("should use idempotency key", async () => { + const idempotencyKey = `test-${Date.now()}`; + + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([200]); + }, 60000); + + it("should handle concurrent jobs", async () => { + const promises = []; + for (let i = 300; i < 320; i++) { + promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); + } + await Promise.all(promises); + + await waitUntilQueueEmpty(); + + expect(testState.maxInFlight).toEqual(3); + }, 60000); + + it("should handle priorities", async () => { + // Hog the queue first + await Promise.all([ + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), + ]); + + // Then those will get reprioritized + await Promise.all([ + queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), + queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), + queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), + + queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), + queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), + queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), + ]); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([ + // Lower numeric priority value should run first + 202, 201, 200, 300, 301, 302, + ]); + }, 60000); +}); diff --git a/packages/plugins/queue-restate/src/tests/setup/startContainers.ts b/packages/plugins/queue-restate/src/tests/setup/startContainers.ts new file mode 100644 index 00000000..7d9dea5c --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/setup/startContainers.ts @@ -0,0 +1,90 @@ +import { execSync } from "child_process"; +import net from "net"; +import path from "path"; +import { fileURLToPath } from "url"; +import type { GlobalSetupContext } from "vitest/node"; + +import { waitUntil } from "../utils.js"; + +async function getRandomPort(): Promise { + const server = net.createServer(); + return new Promise((resolve, reject) => { + server.unref(); + server.on("error", reject); + server.listen(0, () => { + const port = (server.address() as net.AddressInfo).port; + server.close(() => resolve(port)); + }); + }); +} + +async function waitForHealthy( + ingressPort: number, + adminPort: number, + timeout = 60000, +): Promise { + await waitUntil( + async () => { + const response = await fetch(`http://localhost:${adminPort}/health`); + return response.ok; + }, + "Restate admin API is healthy", + timeout, + ); + + await waitUntil( + async () => { + const response = await fetch( + `http://localhost:${ingressPort}/restate/health`, + ); + return response.ok; + }, + "Restate ingress is healthy", + timeout, + ); +} + +export default async function ({ provide }: GlobalSetupContext) { + const __dirname = path.dirname(fileURLToPath(import.meta.url)); + const ingressPort = await getRandomPort(); + const adminPort = await getRandomPort(); + + console.log( + `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`, + ); + execSync(`docker compose up -d`, { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + env: { + ...process.env, + RESTATE_INGRESS_PORT: ingressPort.toString(), + RESTATE_ADMIN_PORT: adminPort.toString(), + }, + }); + + console.log("Waiting for Restate to become healthy..."); + await waitForHealthy(ingressPort, adminPort); + + provide("restateIngressPort", ingressPort); + provide("restateAdminPort", adminPort); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + return async () => { + console.log("Stopping Restate..."); + execSync("docker compose down", { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + }); + return Promise.resolve(); + }; +} + +declare module "vitest" { + export interface ProvidedContext { + restateIngressPort: number; + restateAdminPort: number; + } +} diff --git a/packages/plugins/queue-restate/src/tests/utils.ts b/packages/plugins/queue-restate/src/tests/utils.ts new file mode 100644 index 00000000..e02d2dee --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/utils.ts @@ -0,0 +1,23 @@ +export async function waitUntil( + f: () => Promise, + description: string, + timeoutMs = 60000, +): Promise { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + console.log(`Waiting for ${description}...`); + try { + const res = await f(); + if (res) { + console.log(`${description}: success`); + return; + } + } catch (error) { + console.log(`${description}: error, retrying...: ${error}`); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + throw new Error(`${description}: timeout after ${timeoutMs}ms`); +} diff --git a/packages/plugins/search-meilisearch/index.ts b/packages/plugins/search-meilisearch/index.ts new file mode 100644 index 00000000..3496d52f --- /dev/null +++ b/packages/plugins/search-meilisearch/index.ts @@ -0,0 +1,12 @@ +// Auto-register the MeiliSearch provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { MeiliSearchProvider } from "./src"; + +if (MeiliSearchProvider.isConfigured()) { + PluginManager.register({ + type: PluginType.Search, + name: "MeiliSearch", + provider: new MeiliSearchProvider(), + }); +} diff --git a/packages/plugins/search-meilisearch/src/env.ts b/packages/plugins/search-meilisearch/src/env.ts new file mode 100644 index 00000000..c06fdd55 --- /dev/null +++ b/packages/plugins/search-meilisearch/src/env.ts @@ -0,0 +1,8 @@ +import { z } from "zod"; + +export const envConfig = z + .object({ + MEILI_ADDR: z.string().optional(), + MEILI_MASTER_KEY: z.string().default(""), + }) + .parse(process.env); diff --git a/packages/plugins/search-meilisearch/src/index.ts b/packages/plugins/search-meilisearch/src/index.ts new file mode 100644 index 00000000..30da4a64 --- /dev/null +++ b/packages/plugins/search-meilisearch/src/index.ts @@ -0,0 +1,159 @@ +import type { Index } from "meilisearch"; +import { MeiliSearch } from "meilisearch"; + +import type { + BookmarkSearchDocument, + FilterQuery, + SearchIndexClient, + SearchOptions, + SearchResponse, +} from "@karakeep/shared/search"; +import serverConfig from "@karakeep/shared/config"; +import { PluginProvider } from "@karakeep/shared/plugins"; + +import { envConfig } from "./env"; + +function filterToMeiliSearchFilter(filter: FilterQuery): string { + switch (filter.type) { + case "eq": + return `${filter.field} = "${filter.value}"`; + case "in": + return `${filter.field} IN [${filter.values.join(",")}]`; + default: { + const exhaustiveCheck: never = filter; + throw new Error(`Unhandled color case: ${exhaustiveCheck}`); + } + } +} + +class MeiliSearchIndexClient implements SearchIndexClient { + constructor(private index: Index) {} + + async addDocuments(documents: BookmarkSearchDocument[]): Promise { + const task = await this.index.addDocuments(documents, { + primaryKey: "id", + }); + await this.ensureTaskSuccess(task.taskUid); + } + + async deleteDocuments(ids: string[]): Promise { + const task = await this.index.deleteDocuments(ids); + await this.ensureTaskSuccess(task.taskUid); + } + + async search(options: SearchOptions): Promise { + const result = await this.index.search(options.query, { + filter: options.filter?.map((f) => filterToMeiliSearchFilter(f)), + limit: options.limit, + offset: options.offset, + sort: options.sort?.map((s) => `${s.field}:${s.order}`), + attributesToRetrieve: ["id"], + showRankingScore: true, + }); + + return { + hits: result.hits.map((hit) => ({ + id: hit.id, + score: hit._rankingScore, + })), + totalHits: result.estimatedTotalHits ?? 0, + processingTimeMs: result.processingTimeMs, + }; + } + + async clearIndex(): Promise { + const task = await this.index.deleteAllDocuments(); + await this.ensureTaskSuccess(task.taskUid); + } + + private async ensureTaskSuccess(taskUid: number): Promise { + const task = await this.index.waitForTask(taskUid, { + intervalMs: 200, + timeOutMs: serverConfig.search.jobTimeoutSec * 1000 * 0.9, + }); + if (task.error) { + throw new Error(`Search task failed: ${task.error.message}`); + } + } +} + +export class MeiliSearchProvider implements PluginProvider { + private client: MeiliSearch | undefined; + private indexClient: SearchIndexClient | undefined; + private readonly indexName = "bookmarks"; + + constructor() { + if (MeiliSearchProvider.isConfigured()) { + this.client = new MeiliSearch({ + host: envConfig.MEILI_ADDR!, + apiKey: envConfig.MEILI_MASTER_KEY, + }); + } + } + + static isConfigured(): boolean { + return !!envConfig.MEILI_ADDR; + } + + async getClient(): Promise { + if (this.indexClient) { + return this.indexClient; + } + + if (!this.client) { + return null; + } + + const indices = await this.client.getIndexes(); + let indexFound = indices.results.find((i) => i.uid === this.indexName); + + if (!indexFound) { + const idx = await this.client.createIndex(this.indexName, { + primaryKey: "id", + }); + await this.client.waitForTask(idx.taskUid); + indexFound = await this.client.getIndex( + this.indexName, + ); + } + + await this.configureIndex(indexFound); + this.indexClient = new MeiliSearchIndexClient(indexFound); + return this.indexClient; + } + + private async configureIndex( + index: Index, + ): Promise { + const desiredFilterableAttributes = ["id", "userId"].sort(); + const desiredSortableAttributes = ["createdAt"].sort(); + + const settings = await index.getSettings(); + + if ( + JSON.stringify(settings.filterableAttributes?.sort()) !== + JSON.stringify(desiredFilterableAttributes) + ) { + console.log( + `[meilisearch] Updating desired filterable attributes to ${desiredFilterableAttributes} from ${settings.filterableAttributes}`, + ); + const taskId = await index.updateFilterableAttributes( + desiredFilterableAttributes, + ); + await this.client!.waitForTask(taskId.taskUid); + } + + if ( + JSON.stringify(settings.sortableAttributes?.sort()) !== + JSON.stringify(desiredSortableAttributes) + ) { + console.log( + `[meilisearch] Updating desired sortable attributes to ${desiredSortableAttributes} from ${settings.sortableAttributes}`, + ); + const taskId = await index.updateSortableAttributes( + desiredSortableAttributes, + ); + await this.client!.waitForTask(taskId.taskUid); + } + } +} diff --git a/packages/plugins/tsconfig.json b/packages/plugins/tsconfig.json new file mode 100644 index 00000000..a795b96a --- /dev/null +++ b/packages/plugins/tsconfig.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@karakeep/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + } +} diff --git a/packages/plugins/vitest.config.ts b/packages/plugins/vitest.config.ts new file mode 100644 index 00000000..3d5f33f7 --- /dev/null +++ b/packages/plugins/vitest.config.ts @@ -0,0 +1,14 @@ +/// + +import tsconfigPaths from "vite-tsconfig-paths"; +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + plugins: [tsconfigPaths()], + test: { + globalSetup: ["./queue-restate/src/tests/setup/startContainers.ts"], + teardownTimeout: 30000, + include: ["**/src/tests/**/*.test.ts"], + testTimeout: 60000, + }, +}); diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json index d18eb4a0..578c3330 100644 --- a/packages/shared-server/package.json +++ b/packages/shared-server/package.json @@ -6,9 +6,7 @@ "type": "module", "dependencies": { "@karakeep/db": "workspace:^0.1.0", - "@karakeep/plugins-queue-liteque": "workspace:^0.1.0", - "@karakeep/plugins-queue-restate": "workspace:^0.1.0", - "@karakeep/plugins-search-meilisearch": "workspace:^0.1.0", + "@karakeep/plugins": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0" }, "devDependencies": { diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts index de08b9b0..97503403 100644 --- a/packages/shared-server/src/plugins.ts +++ b/packages/shared-server/src/plugins.ts @@ -7,9 +7,9 @@ export async function loadAllPlugins() { } // Load plugins here. Order of plugin loading matter. // Queue provider(s) - await import("@karakeep/plugins-queue-liteque"); - await import("@karakeep/plugins-queue-restate"); - await import("@karakeep/plugins-search-meilisearch"); + await import("@karakeep/plugins/queue-liteque"); + await import("@karakeep/plugins/queue-restate"); + await import("@karakeep/plugins/search-meilisearch"); PluginManager.logAllPlugins(); pluginsLoaded = true; } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a95e9699..4d89fc27 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1148,29 +1148,7 @@ importers: specifier: ^4.8.1 version: 4.20.3 - packages/plugins-queue-liteque: - dependencies: - '@karakeep/shared': - specifier: workspace:* - version: link:../shared - liteque: - specifier: ^0.6.2 - version: 0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) - devDependencies: - '@karakeep/prettier-config': - specifier: workspace:^0.1.0 - version: link:../../tooling/prettier - '@karakeep/tsconfig': - specifier: workspace:^0.1.0 - version: link:../../tooling/typescript - vite-tsconfig-paths: - specifier: ^4.3.1 - version: 4.3.2(typescript@5.8.3)(vite@7.0.6(@types/node@22.15.30)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0)) - vitest: - specifier: ^3.2.4 - version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.30)(happy-dom@20.0.8)(jiti@2.4.2)(jsdom@27.0.1(postcss@8.5.6))(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0) - - packages/plugins-queue-restate: + packages/plugins: dependencies: '@karakeep/shared': specifier: workspace:* @@ -1181,25 +1159,9 @@ importers: '@restatedev/restate-sdk-clients': specifier: ^1.9.0 version: 1.9.0 - devDependencies: - '@karakeep/prettier-config': - specifier: workspace:^0.1.0 - version: link:../../tooling/prettier - '@karakeep/tsconfig': - specifier: workspace:^0.1.0 - version: link:../../tooling/typescript - vite-tsconfig-paths: - specifier: ^4.3.1 - version: 4.3.2(typescript@5.8.3)(vite@7.0.6(@types/node@22.15.30)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0)) - vitest: - specifier: ^3.2.4 - version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.30)(happy-dom@20.0.8)(jiti@2.4.2)(jsdom@27.0.1(postcss@8.5.6))(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0) - - packages/plugins-search-meilisearch: - dependencies: - '@karakeep/shared': - specifier: workspace:* - version: link:../shared + liteque: + specifier: ^0.6.2 + version: 0.6.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.2.2)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) meilisearch: specifier: ^0.45.0 version: 0.45.0 @@ -1336,15 +1298,9 @@ importers: '@karakeep/db': specifier: workspace:^0.1.0 version: link:../db - '@karakeep/plugins-queue-liteque': - specifier: workspace:^0.1.0 - version: link:../plugins-queue-liteque - '@karakeep/plugins-queue-restate': + '@karakeep/plugins': specifier: workspace:^0.1.0 - version: link:../plugins-queue-restate - '@karakeep/plugins-search-meilisearch': - specifier: workspace:^0.1.0 - version: link:../plugins-search-meilisearch + version: link:../plugins '@karakeep/shared': specifier: workspace:^0.1.0 version: link:../shared @@ -15699,7 +15655,7 @@ snapshots: '@babel/helper-member-expression-to-functions@7.27.1': dependencies: '@babel/traverse': 7.28.0 - '@babel/types': 7.28.1 + '@babel/types': 7.28.5 transitivePeerDependencies: - supports-color @@ -15730,7 +15686,7 @@ snapshots: '@babel/helper-optimise-call-expression@7.27.1': dependencies: - '@babel/types': 7.28.1 + '@babel/types': 7.28.5 '@babel/helper-plugin-utils@7.27.1': {} @@ -21179,7 +21135,7 @@ snapshots: '@types/react-router@5.1.20': dependencies: '@types/history': 4.7.11 - '@types/react': 19.1.8 + '@types/react': 19.2.2 '@types/react-syntax-highlighter@15.5.13': dependencies: @@ -21359,7 +21315,7 @@ snapshots: '@vue/compiler-core@3.5.16': dependencies: - '@babel/parser': 7.28.0 + '@babel/parser': 7.28.5 '@vue/shared': 3.5.16 entities: 4.5.0 estree-walker: 2.0.2 @@ -21814,7 +21770,7 @@ snapshots: babel-plugin-jest-hoist@29.6.3: dependencies: '@babel/template': 7.27.2 - '@babel/types': 7.28.1 + '@babel/types': 7.28.5 '@types/babel__core': 7.20.5 '@types/babel__traverse': 7.20.7 @@ -24698,7 +24654,7 @@ snapshots: history@4.10.1: dependencies: - '@babel/runtime': 7.27.6 + '@babel/runtime': 7.28.4 loose-envify: 1.4.0 resolve-pathname: 3.0.0 tiny-invariant: 1.3.3 @@ -25255,7 +25211,7 @@ snapshots: istanbul-lib-instrument@5.2.1: dependencies: '@babel/core': 7.26.0 - '@babel/parser': 7.28.0 + '@babel/parser': 7.28.5 '@istanbuljs/schema': 0.1.3 istanbul-lib-coverage: 3.2.2 semver: 6.3.1 @@ -26449,7 +26405,7 @@ snapshots: metro-file-map@0.82.5: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3 fb-watchman: 2.0.2 flow-enums-runtime: 0.0.6 graceful-fs: 4.2.11 @@ -26504,7 +26460,7 @@ snapshots: metro-transform-plugins@0.82.5: dependencies: '@babel/core': 7.26.0 - '@babel/generator': 7.28.0 + '@babel/generator': 7.28.5 '@babel/template': 7.27.2 '@babel/traverse': 7.28.0 flow-enums-runtime: 0.0.6 @@ -26515,9 +26471,9 @@ snapshots: metro-transform-worker@0.82.5: dependencies: '@babel/core': 7.26.0 - '@babel/generator': 7.28.0 - '@babel/parser': 7.28.0 - '@babel/types': 7.28.1 + '@babel/generator': 7.28.5 + '@babel/parser': 7.28.5 + '@babel/types': 7.28.5 flow-enums-runtime: 0.0.6 metro: 0.82.5 metro-babel-transformer: 0.82.5 -- cgit v1.2.3-70-g09d2