diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-08 14:50:00 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-08 14:50:00 +0000 |
| commit | 99413db0e79a156a1b87eacd3c6a7b83e9df946e (patch) | |
| tree | 73f0a5fceb507f75f662a109b00beeb3fa6b16fb /packages/plugins | |
| parent | 737b03172c2e063ba311c23d6552418bd2ab1955 (diff) | |
| download | karakeep-99413db0e79a156a1b87eacd3c6a7b83e9df946e.tar.zst | |
refactor: consolidate multiple karakeep plugins into one package (#2101)
* refactor: consolidate plugin packages into single plugins directory
- Create new `packages/plugins` directory with consolidated package.json
- Move queue-liteque, queue-restate, and search-meilisearch to subdirectories
- Update imports in packages/shared-server/src/plugins.ts
- Remove individual plugin package directories
- Update shared-server dependency to use @karakeep/plugins
This reduces overhead of maintaining multiple separate packages for plugins.
* refactor: consolidate plugin config files to root level
- Move .oxlintrc.json to packages/plugins root
- Move vitest.config.ts to packages/plugins root
- Update vitest config paths to work from root
- Remove individual config files from plugin subdirectories
This reduces configuration duplication across plugin subdirectories.
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages/plugins')
20 files changed, 1308 insertions, 0 deletions
diff --git a/packages/plugins/.oxlintrc.json b/packages/plugins/.oxlintrc.json new file mode 100644 index 00000000..79ba0255 --- /dev/null +++ b/packages/plugins/.oxlintrc.json @@ -0,0 +1,19 @@ +{ + "$schema": "../../node_modules/oxlint/configuration_schema.json", + "extends": [ + "../../tooling/oxlint/oxlint-base.json" + ], + "env": { + "builtin": true, + "commonjs": true + }, + "ignorePatterns": [ + "**/*.config.js", + "**/*.config.cjs", + "**/.eslintrc.cjs", + "**/.next", + "**/dist", + "**/build", + "**/pnpm-lock.yaml" + ] +} diff --git a/packages/plugins/package.json b/packages/plugins/package.json new file mode 100644 index 00000000..8b3f73f7 --- /dev/null +++ b/packages/plugins/package.json @@ -0,0 +1,34 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@karakeep/plugins", + "version": "0.1.0", + "private": true, + "type": "module", + "exports": { + "./queue-liteque": "./queue-liteque/index.ts", + "./queue-restate": "./queue-restate/index.ts", + "./search-meilisearch": "./search-meilisearch/index.ts" + }, + "scripts": { + "typecheck": "tsc --noEmit", + "format": "prettier . --cache --ignore-path ../../.prettierignore --check", + "format:fix": "prettier . --cache --ignore-path ../../.prettierignore --write", + "lint": "oxlint .", + "lint:fix": "oxlint . --fix", + "test": "vitest" + }, + "dependencies": { + "@karakeep/shared": "workspace:*", + "@restatedev/restate-sdk": "^1.9.0", + "@restatedev/restate-sdk-clients": "^1.9.0", + "liteque": "^0.6.2", + "meilisearch": "^0.45.0" + }, + "devDependencies": { + "@karakeep/prettier-config": "workspace:^0.1.0", + "@karakeep/tsconfig": "workspace:^0.1.0", + "vite-tsconfig-paths": "^4.3.1", + "vitest": "^3.2.4" + }, + "prettier": "@karakeep/prettier-config" +} diff --git a/packages/plugins/queue-liteque/index.ts b/packages/plugins/queue-liteque/index.ts new file mode 100644 index 00000000..c3f7f03b --- /dev/null +++ b/packages/plugins/queue-liteque/index.ts @@ -0,0 +1,10 @@ +// Auto-register the Liteque queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { LitequeQueueProvider } from "./src"; + +PluginManager.register({ + type: PluginType.Queue, + name: "Liteque", + provider: new LitequeQueueProvider(), +}); diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts new file mode 100644 index 00000000..ddc2181c --- /dev/null +++ b/packages/plugins/queue-liteque/src/index.ts @@ -0,0 +1,137 @@ +import path from "node:path"; +import { + buildDBClient, + SqliteQueue as LQ, + Runner as LQRunner, + migrateDB, +} from "liteque"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + DequeuedJob, + DequeuedJobError, + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import serverConfig from "@karakeep/shared/config"; + +class LitequeQueueWrapper<T> implements Queue<T> { + constructor( + private readonly _name: string, + private readonly lq: LQ<T>, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise<string | undefined> { + const job = await this.lq.enqueue(payload, options); + // liteque returns a Job with numeric id + return job ? String(job.id) : undefined; + } + + async stats() { + return this.lq.stats(); + } + + async cancelAllNonRunning(): Promise<number> { + return this.lq.cancelAllNonRunning(); + } + + // Internal accessor for runner + get _impl(): LQ<T> { + return this.lq; + } +} + +class LitequeQueueClient implements QueueClient { + private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { + walEnabled: serverConfig.database.walMode, + }); + + private queues = new Map<string, LitequeQueueWrapper<unknown>>(); + + async prepare(): Promise<void> { + migrateDB(this.db); + } + + async start(): Promise<void> { + // No-op for sqlite + } + + createQueue<T>(name: string, options: QueueOptions): Queue<T> { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const lq = new LQ<T>(name, this.db, { + defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, + keepFailedJobs: options.keepFailedJobs, + }); + const wrapper = new LitequeQueueWrapper<T>(name, lq, options); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + ): Runner<T> { + const name = queue.name(); + let wrapper = this.queues.get(name); + if (!wrapper) { + throw new Error(`Queue ${name} not found`); + } + + const runner = new LQRunner<T>( + wrapper._impl, + { + run: funcs.run, + onComplete: funcs.onComplete as + | ((job: DequeuedJob<T>) => Promise<void>) + | undefined, + onError: funcs.onError as + | ((job: DequeuedJobError<T>) => Promise<void>) + | undefined, + }, + { + pollIntervalMs: opts.pollIntervalMs ?? 1000, + timeoutSecs: opts.timeoutSecs, + concurrency: opts.concurrency, + validator: opts.validator, + }, + ); + + return { + run: () => runner.run(), + stop: () => runner.stop(), + runUntilEmpty: () => runner.runUntilEmpty(), + }; + } + + async shutdown(): Promise<void> { + // No-op for sqlite + } +} + +export class LitequeQueueProvider implements PluginProvider<QueueClient> { + private client: QueueClient | null = null; + + async getClient(): Promise<QueueClient | null> { + if (!this.client) { + const client = new LitequeQueueClient(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins/queue-restate/index.ts b/packages/plugins/queue-restate/index.ts new file mode 100644 index 00000000..d313615c --- /dev/null +++ b/packages/plugins/queue-restate/index.ts @@ -0,0 +1,12 @@ +// Auto-register the Restate queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { RestateQueueProvider } from "./src"; + +if (RestateQueueProvider.isConfigured()) { + PluginManager.register({ + type: PluginType.Queue, + name: "Restate", + provider: new RestateQueueProvider(), + }); +} diff --git a/packages/plugins/queue-restate/src/admin.ts b/packages/plugins/queue-restate/src/admin.ts new file mode 100644 index 00000000..dddc8f00 --- /dev/null +++ b/packages/plugins/queue-restate/src/admin.ts @@ -0,0 +1,75 @@ +import { z } from "zod"; + +export class AdminClient { + constructor(private addr: string) {} + + async upsertDeployment(deploymentAddr: string) { + const res = await fetch(`${this.addr}/deployments`, { + method: "POST", + body: JSON.stringify({ + uri: deploymentAddr, + force: true, + }), + headers: { + "Content-Type": "application/json", + }, + }); + + if (!res.ok) { + throw new Error(`Failed to upsert deployment: ${res.status}`); + } + } + + async getStats(serviceName: string) { + const query = `select status, count(*) as count from sys_invocation where target_service_name='${serviceName}' group by status`; + const res = await fetch(`${this.addr}/query`, { + method: "POST", + body: JSON.stringify({ + query, + }), + headers: { + "Content-Type": "application/json", + Accept: "application/json", + }, + }); + + if (!res.ok) { + throw new Error(`Failed to get stats: ${res.status}`); + } + const zStatus = z.enum([ + "pending", + "scheduled", + "ready", + "running", + "paused", + "backing-off", + "suspended", + "completed", + ]); + const zSchema = z.object({ + rows: z.array( + z.object({ + status: zStatus, + count: z.number(), + }), + ), + }); + + return zSchema.parse(await res.json()).rows.reduce( + (acc, cur) => { + acc[cur.status] = cur.count; + return acc; + }, + { + pending: 0, + scheduled: 0, + ready: 0, + running: 0, + paused: 0, + "backing-off": 0, + suspended: 0, + completed: 0, + } as Record<z.infer<typeof zStatus>, number>, + ); + } +} diff --git a/packages/plugins/queue-restate/src/env.ts b/packages/plugins/queue-restate/src/env.ts new file mode 100644 index 00000000..01175e86 --- /dev/null +++ b/packages/plugins/queue-restate/src/env.ts @@ -0,0 +1,13 @@ +import { z } from "zod"; + +export const envConfig = z + .object({ + RESTATE_LISTEN_PORT: z.coerce.number().optional(), + RESTATE_INGRESS_ADDR: z + .string() + .optional() + .default("http://localhost:8080"), + RESTATE_ADMIN_ADDR: z.string().optional().default("http://localhost:9070"), + RESTATE_PUB_KEY: z.string().optional(), + }) + .parse(process.env); diff --git a/packages/plugins/queue-restate/src/idProvider.ts b/packages/plugins/queue-restate/src/idProvider.ts new file mode 100644 index 00000000..ee85f46f --- /dev/null +++ b/packages/plugins/queue-restate/src/idProvider.ts @@ -0,0 +1,21 @@ +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +export const idProvider = object({ + name: "IdProvider", + handlers: { + get: async (ctx: ObjectContext<{ nextId: number }>): Promise<number> => { + const state = (await ctx.get("nextId")) ?? 0; + ctx.set("nextId", state + 1); + return state; + }, + }, + options: { + ingressPrivate: true, + }, +}); + +export async function genId(ctx: Context) { + return ctx + .objectClient<typeof idProvider>({ name: "IdProvider" }, "IdProvider") + .get(); +} diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts new file mode 100644 index 00000000..bedc26af --- /dev/null +++ b/packages/plugins/queue-restate/src/index.ts @@ -0,0 +1,201 @@ +import * as restate from "@restatedev/restate-sdk"; +import * as restateClient from "@restatedev/restate-sdk-clients"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import logger from "@karakeep/shared/logger"; + +import { AdminClient } from "./admin"; +import { envConfig } from "./env"; +import { idProvider } from "./idProvider"; +import { semaphore } from "./semaphore"; +import { buildRestateService } from "./service"; + +class RestateQueueWrapper<T> implements Queue<T> { + constructor( + private readonly _name: string, + private readonly client: restateClient.Ingress, + private readonly adminClient: AdminClient, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise<string | undefined> { + interface MyService { + run: ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => Promise<void>; + } + const cl = this.client.serviceSendClient<MyService>({ name: this.name() }); + const res = await cl.run( + { + payload, + priority: options?.priority ?? 0, + }, + restateClient.rpc.sendOpts({ + delay: options?.delayMs + ? { + milliseconds: options.delayMs, + } + : undefined, + idempotencyKey: options?.idempotencyKey, + }), + ); + return res.invocationId; + } + + async stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }> { + const res = await this.adminClient.getStats(this.name()); + return { + pending: res.pending + res.ready, + pending_retry: res["backing-off"] + res.paused + res.suspended, + running: res.running, + failed: 0, + }; + } + + async cancelAllNonRunning(): Promise<number> { + throw new Error("Method not implemented."); + } +} + +class RestateRunnerWrapper<T> implements Runner<T> { + constructor( + private readonly wf: restate.ServiceDefinition< + string, + { + run: (ctx: restate.Context, data: T) => Promise<void>; + } + >, + ) {} + + async run(): Promise<void> { + // No-op for restate + } + + async stop(): Promise<void> { + // No-op for restate + } + + async runUntilEmpty(): Promise<void> { + throw new Error("Method not implemented."); + } + + get def(): restate.WorkflowDefinition<string, unknown> { + return this.wf; + } +} + +class RestateQueueClient implements QueueClient { + private client: restateClient.Ingress; + private adminClient: AdminClient; + private queues = new Map<string, RestateQueueWrapper<unknown>>(); + private services = new Map<string, RestateRunnerWrapper<unknown>>(); + + constructor() { + this.client = restateClient.connect({ + url: envConfig.RESTATE_INGRESS_ADDR, + }); + this.adminClient = new AdminClient(envConfig.RESTATE_ADMIN_ADDR); + } + + async prepare(): Promise<void> { + // No-op for restate + } + + async start(): Promise<void> { + const port = await restate.serve({ + port: envConfig.RESTATE_LISTEN_PORT ?? 0, + services: [ + ...[...this.services.values()].map((svc) => svc.def), + semaphore, + idProvider, + ], + identityKeys: envConfig.RESTATE_PUB_KEY + ? [envConfig.RESTATE_PUB_KEY] + : undefined, + logger: (meta, msg) => { + if (meta.context) { + // No need to log invocation logs + } else { + logger.log(meta.level, `[restate] ${msg}`); + } + }, + }); + logger.info(`Restate listening on port ${port}`); + } + + createQueue<T>(name: string, opts: QueueOptions): Queue<T> { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const wrapper = new RestateQueueWrapper<T>( + name, + this.client, + this.adminClient, + opts, + ); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + ): Runner<T> { + const name = queue.name(); + let wrapper = this.services.get(name); + if (wrapper) { + throw new Error(`Queue ${name} already exists`); + } + const svc = new RestateRunnerWrapper<T>( + buildRestateService(queue, funcs, opts, queue.opts), + ); + this.services.set(name, svc); + return svc; + } + + async shutdown(): Promise<void> { + // No-op for sqlite + } +} + +export class RestateQueueProvider implements PluginProvider<QueueClient> { + private client: QueueClient | null = null; + + static isConfigured(): boolean { + return envConfig.RESTATE_LISTEN_PORT !== undefined; + } + + async getClient(): Promise<QueueClient | null> { + if (!this.client) { + const client = new RestateQueueClient(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins/queue-restate/src/semaphore.ts b/packages/plugins/queue-restate/src/semaphore.ts new file mode 100644 index 00000000..ad636f98 --- /dev/null +++ b/packages/plugins/queue-restate/src/semaphore.ts @@ -0,0 +1,109 @@ +// Inspired from https://github.com/restatedev/examples/blob/main/typescript/patterns-use-cases/src/priorityqueue/queue.ts + +import { Context, object, ObjectContext } from "@restatedev/restate-sdk"; + +interface QueueItem { + awakeable: string; + priority: number; +} + +interface QueueState { + items: QueueItem[]; + inFlight: number; +} + +export const semaphore = object({ + name: "Semaphore", + handlers: { + acquire: async ( + ctx: ObjectContext<QueueState>, + req: { awakeableId: string; priority: number; capacity: number }, + ): Promise<void> => { + const state = await getState(ctx); + + state.items.push({ + awakeable: req.awakeableId, + priority: req.priority, + }); + + tick(ctx, state, req.capacity); + + setState(ctx, state); + }, + + release: async ( + ctx: ObjectContext<QueueState>, + capacity: number, + ): Promise<void> => { + const state = await getState(ctx); + state.inFlight--; + tick(ctx, state, capacity); + setState(ctx, state); + }, + }, + options: { + ingressPrivate: true, + }, +}); + +// Lower numbers represent higher priority, mirroring Liteque’s semantics. +function selectAndPopItem(items: QueueItem[]): QueueItem { + let selected = { priority: Number.MAX_SAFE_INTEGER, index: 0 }; + for (const [i, item] of items.entries()) { + if (item.priority < selected.priority) { + selected.priority = item.priority; + selected.index = i; + } + } + const [item] = items.splice(selected.index, 1); + return item; +} + +function tick( + ctx: ObjectContext<QueueState>, + state: QueueState, + capacity: number, +) { + while (state.inFlight < capacity && state.items.length > 0) { + const item = selectAndPopItem(state.items); + state.inFlight++; + ctx.resolveAwakeable(item.awakeable); + } +} + +async function getState(ctx: ObjectContext<QueueState>): Promise<QueueState> { + return { + items: (await ctx.get("items")) ?? [], + inFlight: (await ctx.get("inFlight")) ?? 0, + }; +} + +function setState(ctx: ObjectContext<QueueState>, state: QueueState) { + ctx.set("items", state.items); + ctx.set("inFlight", state.inFlight); +} + +export class RestateSemaphore { + constructor( + private readonly ctx: Context, + private readonly id: string, + private readonly capacity: number, + ) {} + + async acquire(priority: number) { + const awk = this.ctx.awakeable(); + await this.ctx + .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) + .acquire({ + awakeableId: awk.id, + priority, + capacity: this.capacity, + }); + await awk.promise; + } + async release() { + await this.ctx + .objectClient<typeof semaphore>({ name: "Semaphore" }, this.id) + .release(this.capacity); + } +} diff --git a/packages/plugins/queue-restate/src/service.ts b/packages/plugins/queue-restate/src/service.ts new file mode 100644 index 00000000..de5b070f --- /dev/null +++ b/packages/plugins/queue-restate/src/service.ts @@ -0,0 +1,133 @@ +import * as restate from "@restatedev/restate-sdk"; + +import type { + Queue, + QueueOptions, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import { genId } from "./idProvider"; +import { RestateSemaphore } from "./semaphore"; + +export function buildRestateService<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + queueOpts: QueueOptions, +) { + const NUM_RETRIES = queueOpts.defaultJobArgs.numRetries; + return restate.service({ + name: queue.name(), + options: { + inactivityTimeout: { + seconds: opts.timeoutSecs, + }, + }, + handlers: { + run: async ( + ctx: restate.Context, + data: { + payload: T; + priority: number; + }, + ) => { + const id = `${await genId(ctx)}`; + let payload = data.payload; + if (opts.validator) { + const res = opts.validator.safeParse(data.payload); + if (!res.success) { + throw new restate.TerminalError(res.error.message, { + errorCode: 400, + }); + } + payload = res.data; + } + + const priority = data.priority ?? 0; + + const semaphore = new RestateSemaphore( + ctx, + `queue:${queue.name()}`, + opts.concurrency, + ); + + let lastError: Error | undefined; + for (let runNumber = 0; runNumber <= NUM_RETRIES; runNumber++) { + await semaphore.acquire(priority); + const res = await runWorkerLogic(ctx, funcs, { + id, + data: payload, + priority, + runNumber, + numRetriesLeft: NUM_RETRIES - runNumber, + abortSignal: AbortSignal.timeout(opts.timeoutSecs * 1000), + }); + await semaphore.release(); + if (res.error) { + lastError = res.error; + // TODO: add backoff + await ctx.sleep(1000); + } else { + break; + } + } + if (lastError) { + throw new restate.TerminalError(lastError.message, { + errorCode: 500, + cause: "cause" in lastError ? lastError.cause : undefined, + }); + } + }, + }, + }); +} + +async function runWorkerLogic<T>( + ctx: restate.Context, + { run, onError, onComplete }: RunnerFuncs<T>, + data: { + id: string; + data: T; + priority: number; + runNumber: number; + numRetriesLeft: number; + abortSignal: AbortSignal; + }, +) { + const res = await tryCatch( + ctx.run( + `main logic`, + async () => { + await run(data); + }, + { + maxRetryAttempts: 1, + }, + ), + ); + if (res.error) { + await tryCatch( + ctx.run( + `onError`, + async () => + onError?.({ + ...data, + error: res.error, + }), + { + maxRetryAttempts: 1, + }, + ), + ); + return res; + } + + await tryCatch( + ctx.run("onComplete", async () => await onComplete?.(data), { + maxRetryAttempts: 1, + }), + ); + return res; +} diff --git a/packages/plugins/queue-restate/src/tests/docker-compose.yml b/packages/plugins/queue-restate/src/tests/docker-compose.yml new file mode 100644 index 00000000..f24c2921 --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/docker-compose.yml @@ -0,0 +1,8 @@ +services: + restate: + image: docker.restate.dev/restatedev/restate:1.5 + ports: + - "${RESTATE_INGRESS_PORT:-8080}:8080" + - "${RESTATE_ADMIN_PORT:-9070}:9070" + extra_hosts: + - "host.docker.internal:host-gateway" diff --git a/packages/plugins/queue-restate/src/tests/queue.test.ts b/packages/plugins/queue-restate/src/tests/queue.test.ts new file mode 100644 index 00000000..e59d47cb --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/queue.test.ts @@ -0,0 +1,221 @@ +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + inject, + it, +} from "vitest"; + +import type { Queue, QueueClient } from "@karakeep/shared/queueing"; + +import { AdminClient } from "../admin.js"; +import { RestateQueueProvider } from "../index.js"; +import { waitUntil } from "./utils.js"; + +type TestAction = + | { type: "val"; val: number } + | { type: "err"; err: string } + | { type: "stall"; durSec: number }; + +describe("Restate Queue Provider", () => { + let queueClient: QueueClient; + let queue: Queue<TestAction>; + let adminClient: AdminClient; + + const testState = { + results: [] as number[], + errors: [] as string[], + inFlight: 0, + maxInFlight: 0, + }; + + async function waitUntilQueueEmpty() { + await waitUntil( + async () => { + const stats = await queue.stats(); + return stats.pending + stats.pending_retry + stats.running === 0; + }, + "Queue to be empty", + 60000, + ); + } + + beforeEach(async () => { + testState.results = []; + testState.errors = []; + testState.inFlight = 0; + testState.maxInFlight = 0; + }); + afterEach(async () => { + await waitUntilQueueEmpty(); + }); + + beforeAll(async () => { + const ingressPort = inject("restateIngressPort"); + const adminPort = inject("restateAdminPort"); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + const provider = new RestateQueueProvider(); + const client = await provider.getClient(); + + if (!client) { + throw new Error("Failed to create queue client"); + } + + queueClient = client; + adminClient = new AdminClient(process.env.RESTATE_ADMIN_ADDR); + + queue = queueClient.createQueue<TestAction>("test-queue", { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }); + + queueClient.createRunner( + queue, + { + run: async (job) => { + testState.inFlight++; + testState.maxInFlight = Math.max( + testState.maxInFlight, + testState.inFlight, + ); + const jobData = job.data; + switch (jobData.type) { + case "val": + testState.results.push(jobData.val); + break; + case "err": + throw new Error(jobData.err); + case "stall": + await new Promise((resolve) => + setTimeout(resolve, jobData.durSec * 1000), + ); + break; + } + }, + onError: async (job) => { + testState.inFlight--; + const jobData = job.data; + if (jobData && jobData.type === "err") { + testState.errors.push(jobData.err); + } + }, + onComplete: async () => { + testState.inFlight--; + }, + }, + { + concurrency: 3, + timeoutSecs: 2, + pollIntervalMs: 0 /* Doesn't matter */, + }, + ); + + await queueClient.prepare(); + await queueClient.start(); + + await adminClient.upsertDeployment("http://host.docker.internal:9080"); + }, 90000); + + afterAll(async () => { + if (queueClient?.shutdown) { + await queueClient.shutdown(); + } + }); + + it("should enqueue and process a job", async () => { + const jobId = await queue.enqueue({ type: "val", val: 42 }); + + expect(jobId).toBeDefined(); + expect(typeof jobId).toBe("string"); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([42]); + }, 60000); + + it("should process multiple jobs", async () => { + await queue.enqueue({ type: "val", val: 1 }); + await queue.enqueue({ type: "val", val: 2 }); + await queue.enqueue({ type: "val", val: 3 }); + + await waitUntilQueueEmpty(); + + expect(testState.results.length).toEqual(3); + expect(testState.results).toContain(1); + expect(testState.results).toContain(2); + expect(testState.results).toContain(3); + }, 60000); + + it("should retry failed jobs", async () => { + await queue.enqueue({ type: "err", err: "Test error" }); + + await waitUntilQueueEmpty(); + + // Initial attempt + 3 retries + expect(testState.errors).toEqual([ + "Test error", + "Test error", + "Test error", + "Test error", + ]); + }, 90000); + + it("should use idempotency key", async () => { + const idempotencyKey = `test-${Date.now()}`; + + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + await queue.enqueue({ type: "val", val: 200 }, { idempotencyKey }); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([200]); + }, 60000); + + it("should handle concurrent jobs", async () => { + const promises = []; + for (let i = 300; i < 320; i++) { + promises.push(queue.enqueue({ type: "stall", durSec: 0.1 })); + } + await Promise.all(promises); + + await waitUntilQueueEmpty(); + + expect(testState.maxInFlight).toEqual(3); + }, 60000); + + it("should handle priorities", async () => { + // Hog the queue first + await Promise.all([ + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 0 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 1 }), + queue.enqueue({ type: "stall", durSec: 1 }, { priority: 2 }), + ]); + + // Then those will get reprioritized + await Promise.all([ + queue.enqueue({ type: "val", val: 200 }, { priority: -1 }), + queue.enqueue({ type: "val", val: 201 }, { priority: -2 }), + queue.enqueue({ type: "val", val: 202 }, { priority: -3 }), + + queue.enqueue({ type: "val", val: 300 }, { priority: 0 }), + queue.enqueue({ type: "val", val: 301 }, { priority: 1 }), + queue.enqueue({ type: "val", val: 302 }, { priority: 2 }), + ]); + + await waitUntilQueueEmpty(); + + expect(testState.results).toEqual([ + // Lower numeric priority value should run first + 202, 201, 200, 300, 301, 302, + ]); + }, 60000); +}); diff --git a/packages/plugins/queue-restate/src/tests/setup/startContainers.ts b/packages/plugins/queue-restate/src/tests/setup/startContainers.ts new file mode 100644 index 00000000..7d9dea5c --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/setup/startContainers.ts @@ -0,0 +1,90 @@ +import { execSync } from "child_process"; +import net from "net"; +import path from "path"; +import { fileURLToPath } from "url"; +import type { GlobalSetupContext } from "vitest/node"; + +import { waitUntil } from "../utils.js"; + +async function getRandomPort(): Promise<number> { + const server = net.createServer(); + return new Promise<number>((resolve, reject) => { + server.unref(); + server.on("error", reject); + server.listen(0, () => { + const port = (server.address() as net.AddressInfo).port; + server.close(() => resolve(port)); + }); + }); +} + +async function waitForHealthy( + ingressPort: number, + adminPort: number, + timeout = 60000, +): Promise<void> { + await waitUntil( + async () => { + const response = await fetch(`http://localhost:${adminPort}/health`); + return response.ok; + }, + "Restate admin API is healthy", + timeout, + ); + + await waitUntil( + async () => { + const response = await fetch( + `http://localhost:${ingressPort}/restate/health`, + ); + return response.ok; + }, + "Restate ingress is healthy", + timeout, + ); +} + +export default async function ({ provide }: GlobalSetupContext) { + const __dirname = path.dirname(fileURLToPath(import.meta.url)); + const ingressPort = await getRandomPort(); + const adminPort = await getRandomPort(); + + console.log( + `Starting Restate on ports ${ingressPort} (ingress) and ${adminPort} (admin)...`, + ); + execSync(`docker compose up -d`, { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + env: { + ...process.env, + RESTATE_INGRESS_PORT: ingressPort.toString(), + RESTATE_ADMIN_PORT: adminPort.toString(), + }, + }); + + console.log("Waiting for Restate to become healthy..."); + await waitForHealthy(ingressPort, adminPort); + + provide("restateIngressPort", ingressPort); + provide("restateAdminPort", adminPort); + + process.env.RESTATE_INGRESS_ADDR = `http://localhost:${ingressPort}`; + process.env.RESTATE_ADMIN_ADDR = `http://localhost:${adminPort}`; + process.env.RESTATE_LISTEN_PORT = "9080"; + + return async () => { + console.log("Stopping Restate..."); + execSync("docker compose down", { + cwd: path.join(__dirname, ".."), + stdio: "ignore", + }); + return Promise.resolve(); + }; +} + +declare module "vitest" { + export interface ProvidedContext { + restateIngressPort: number; + restateAdminPort: number; + } +} diff --git a/packages/plugins/queue-restate/src/tests/utils.ts b/packages/plugins/queue-restate/src/tests/utils.ts new file mode 100644 index 00000000..e02d2dee --- /dev/null +++ b/packages/plugins/queue-restate/src/tests/utils.ts @@ -0,0 +1,23 @@ +export async function waitUntil( + f: () => Promise<boolean>, + description: string, + timeoutMs = 60000, +): Promise<void> { + const startTime = Date.now(); + + while (Date.now() - startTime < timeoutMs) { + console.log(`Waiting for ${description}...`); + try { + const res = await f(); + if (res) { + console.log(`${description}: success`); + return; + } + } catch (error) { + console.log(`${description}: error, retrying...: ${error}`); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + + throw new Error(`${description}: timeout after ${timeoutMs}ms`); +} diff --git a/packages/plugins/search-meilisearch/index.ts b/packages/plugins/search-meilisearch/index.ts new file mode 100644 index 00000000..3496d52f --- /dev/null +++ b/packages/plugins/search-meilisearch/index.ts @@ -0,0 +1,12 @@ +// Auto-register the MeiliSearch provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { MeiliSearchProvider } from "./src"; + +if (MeiliSearchProvider.isConfigured()) { + PluginManager.register({ + type: PluginType.Search, + name: "MeiliSearch", + provider: new MeiliSearchProvider(), + }); +} diff --git a/packages/plugins/search-meilisearch/src/env.ts b/packages/plugins/search-meilisearch/src/env.ts new file mode 100644 index 00000000..c06fdd55 --- /dev/null +++ b/packages/plugins/search-meilisearch/src/env.ts @@ -0,0 +1,8 @@ +import { z } from "zod"; + +export const envConfig = z + .object({ + MEILI_ADDR: z.string().optional(), + MEILI_MASTER_KEY: z.string().default(""), + }) + .parse(process.env); diff --git a/packages/plugins/search-meilisearch/src/index.ts b/packages/plugins/search-meilisearch/src/index.ts new file mode 100644 index 00000000..30da4a64 --- /dev/null +++ b/packages/plugins/search-meilisearch/src/index.ts @@ -0,0 +1,159 @@ +import type { Index } from "meilisearch"; +import { MeiliSearch } from "meilisearch"; + +import type { + BookmarkSearchDocument, + FilterQuery, + SearchIndexClient, + SearchOptions, + SearchResponse, +} from "@karakeep/shared/search"; +import serverConfig from "@karakeep/shared/config"; +import { PluginProvider } from "@karakeep/shared/plugins"; + +import { envConfig } from "./env"; + +function filterToMeiliSearchFilter(filter: FilterQuery): string { + switch (filter.type) { + case "eq": + return `${filter.field} = "${filter.value}"`; + case "in": + return `${filter.field} IN [${filter.values.join(",")}]`; + default: { + const exhaustiveCheck: never = filter; + throw new Error(`Unhandled color case: ${exhaustiveCheck}`); + } + } +} + +class MeiliSearchIndexClient implements SearchIndexClient { + constructor(private index: Index<BookmarkSearchDocument>) {} + + async addDocuments(documents: BookmarkSearchDocument[]): Promise<void> { + const task = await this.index.addDocuments(documents, { + primaryKey: "id", + }); + await this.ensureTaskSuccess(task.taskUid); + } + + async deleteDocuments(ids: string[]): Promise<void> { + const task = await this.index.deleteDocuments(ids); + await this.ensureTaskSuccess(task.taskUid); + } + + async search(options: SearchOptions): Promise<SearchResponse> { + const result = await this.index.search(options.query, { + filter: options.filter?.map((f) => filterToMeiliSearchFilter(f)), + limit: options.limit, + offset: options.offset, + sort: options.sort?.map((s) => `${s.field}:${s.order}`), + attributesToRetrieve: ["id"], + showRankingScore: true, + }); + + return { + hits: result.hits.map((hit) => ({ + id: hit.id, + score: hit._rankingScore, + })), + totalHits: result.estimatedTotalHits ?? 0, + processingTimeMs: result.processingTimeMs, + }; + } + + async clearIndex(): Promise<void> { + const task = await this.index.deleteAllDocuments(); + await this.ensureTaskSuccess(task.taskUid); + } + + private async ensureTaskSuccess(taskUid: number): Promise<void> { + const task = await this.index.waitForTask(taskUid, { + intervalMs: 200, + timeOutMs: serverConfig.search.jobTimeoutSec * 1000 * 0.9, + }); + if (task.error) { + throw new Error(`Search task failed: ${task.error.message}`); + } + } +} + +export class MeiliSearchProvider implements PluginProvider<SearchIndexClient> { + private client: MeiliSearch | undefined; + private indexClient: SearchIndexClient | undefined; + private readonly indexName = "bookmarks"; + + constructor() { + if (MeiliSearchProvider.isConfigured()) { + this.client = new MeiliSearch({ + host: envConfig.MEILI_ADDR!, + apiKey: envConfig.MEILI_MASTER_KEY, + }); + } + } + + static isConfigured(): boolean { + return !!envConfig.MEILI_ADDR; + } + + async getClient(): Promise<SearchIndexClient | null> { + if (this.indexClient) { + return this.indexClient; + } + + if (!this.client) { + return null; + } + + const indices = await this.client.getIndexes(); + let indexFound = indices.results.find((i) => i.uid === this.indexName); + + if (!indexFound) { + const idx = await this.client.createIndex(this.indexName, { + primaryKey: "id", + }); + await this.client.waitForTask(idx.taskUid); + indexFound = await this.client.getIndex<BookmarkSearchDocument>( + this.indexName, + ); + } + + await this.configureIndex(indexFound); + this.indexClient = new MeiliSearchIndexClient(indexFound); + return this.indexClient; + } + + private async configureIndex( + index: Index<BookmarkSearchDocument>, + ): Promise<void> { + const desiredFilterableAttributes = ["id", "userId"].sort(); + const desiredSortableAttributes = ["createdAt"].sort(); + + const settings = await index.getSettings(); + + if ( + JSON.stringify(settings.filterableAttributes?.sort()) !== + JSON.stringify(desiredFilterableAttributes) + ) { + console.log( + `[meilisearch] Updating desired filterable attributes to ${desiredFilterableAttributes} from ${settings.filterableAttributes}`, + ); + const taskId = await index.updateFilterableAttributes( + desiredFilterableAttributes, + ); + await this.client!.waitForTask(taskId.taskUid); + } + + if ( + JSON.stringify(settings.sortableAttributes?.sort()) !== + JSON.stringify(desiredSortableAttributes) + ) { + console.log( + `[meilisearch] Updating desired sortable attributes to ${desiredSortableAttributes} from ${settings.sortableAttributes}`, + ); + const taskId = await index.updateSortableAttributes( + desiredSortableAttributes, + ); + await this.client!.waitForTask(taskId.taskUid); + } + } +} diff --git a/packages/plugins/tsconfig.json b/packages/plugins/tsconfig.json new file mode 100644 index 00000000..a795b96a --- /dev/null +++ b/packages/plugins/tsconfig.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@karakeep/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + } +} diff --git a/packages/plugins/vitest.config.ts b/packages/plugins/vitest.config.ts new file mode 100644 index 00000000..3d5f33f7 --- /dev/null +++ b/packages/plugins/vitest.config.ts @@ -0,0 +1,14 @@ +/// <reference types="vitest" /> + +import tsconfigPaths from "vite-tsconfig-paths"; +import { defineConfig } from "vitest/config"; + +export default defineConfig({ + plugins: [tsconfigPaths()], + test: { + globalSetup: ["./queue-restate/src/tests/setup/startContainers.ts"], + teardownTimeout: 30000, + include: ["**/src/tests/**/*.test.ts"], + testTimeout: 60000, + }, +}); |
