From 99413db0e79a156a1b87eacd3c6a7b83e9df946e Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 8 Nov 2025 14:50:00 +0000 Subject: refactor: consolidate multiple karakeep plugins into one package (#2101) * refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude --- packages/plugins/queue-liteque/index.ts | 10 ++ packages/plugins/queue-liteque/src/index.ts | 137 ++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 packages/plugins/queue-liteque/index.ts create mode 100644 packages/plugins/queue-liteque/src/index.ts (limited to 'packages/plugins/queue-liteque') diff --git a/packages/plugins/queue-liteque/index.ts b/packages/plugins/queue-liteque/index.ts new file mode 100644 index 00000000..c3f7f03b --- /dev/null +++ b/packages/plugins/queue-liteque/index.ts @@ -0,0 +1,10 @@ +// Auto-register the Liteque queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { LitequeQueueProvider } from "./src"; + +PluginManager.register({ + type: PluginType.Queue, + name: "Liteque", + provider: new LitequeQueueProvider(), +}); diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts new file mode 100644 index 00000000..ddc2181c --- /dev/null +++ b/packages/plugins/queue-liteque/src/index.ts @@ -0,0 +1,137 @@ +import path from "node:path"; +import { + buildDBClient, + SqliteQueue as LQ, + Runner as LQRunner, + migrateDB, +} from "liteque"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + DequeuedJob, + DequeuedJobError, + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import serverConfig from "@karakeep/shared/config"; + +class LitequeQueueWrapper implements Queue { + constructor( + private readonly _name: string, + private readonly lq: LQ, + public readonly opts: QueueOptions, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise { + const job = await this.lq.enqueue(payload, options); + // liteque returns a Job with numeric id + return job ? String(job.id) : undefined; + } + + async stats() { + return this.lq.stats(); + } + + async cancelAllNonRunning(): Promise { + return this.lq.cancelAllNonRunning(); + } + + // Internal accessor for runner + get _impl(): LQ { + return this.lq; + } +} + +class LitequeQueueClient implements QueueClient { + private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { + walEnabled: serverConfig.database.walMode, + }); + + private queues = new Map>(); + + async prepare(): Promise { + migrateDB(this.db); + } + + async start(): Promise { + // No-op for sqlite + } + + createQueue(name: string, options: QueueOptions): Queue { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const lq = new LQ(name, this.db, { + defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, + keepFailedJobs: options.keepFailedJobs, + }); + const wrapper = new LitequeQueueWrapper(name, lq, options); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner { + const name = queue.name(); + let wrapper = this.queues.get(name); + if (!wrapper) { + throw new Error(`Queue ${name} not found`); + } + + const runner = new LQRunner( + wrapper._impl, + { + run: funcs.run, + onComplete: funcs.onComplete as + | ((job: DequeuedJob) => Promise) + | undefined, + onError: funcs.onError as + | ((job: DequeuedJobError) => Promise) + | undefined, + }, + { + pollIntervalMs: opts.pollIntervalMs ?? 1000, + timeoutSecs: opts.timeoutSecs, + concurrency: opts.concurrency, + validator: opts.validator, + }, + ); + + return { + run: () => runner.run(), + stop: () => runner.stop(), + runUntilEmpty: () => runner.runUntilEmpty(), + }; + } + + async shutdown(): Promise { + // No-op for sqlite + } +} + +export class LitequeQueueProvider implements PluginProvider { + private client: QueueClient | null = null; + + async getClient(): Promise { + if (!this.client) { + const client = new LitequeQueueClient(); + this.client = client; + } + return this.client; + } +} -- cgit v1.2.3-70-g09d2