From 99413db0e79a156a1b87eacd3c6a7b83e9df946e Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 8 Nov 2025 14:50:00 +0000 Subject: refactor: consolidate multiple karakeep plugins into one package (#2101) * refactor: consolidate plugin packages into single plugins directory - Create new `packages/plugins` directory with consolidated package.json - Move queue-liteque, queue-restate, and search-meilisearch to subdirectories - Update imports in packages/shared-server/src/plugins.ts - Remove individual plugin package directories - Update shared-server dependency to use @karakeep/plugins This reduces overhead of maintaining multiple separate packages for plugins. * refactor: consolidate plugin config files to root level - Move .oxlintrc.json to packages/plugins root - Move vitest.config.ts to packages/plugins root - Update vitest config paths to work from root - Remove individual config files from plugin subdirectories This reduces configuration duplication across plugin subdirectories. --------- Co-authored-by: Claude --- packages/plugins-queue-liteque/src/index.ts | 137 ---------------------------- 1 file changed, 137 deletions(-) delete mode 100644 packages/plugins-queue-liteque/src/index.ts (limited to 'packages/plugins-queue-liteque/src/index.ts') diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts deleted file mode 100644 index ddc2181c..00000000 --- a/packages/plugins-queue-liteque/src/index.ts +++ /dev/null @@ -1,137 +0,0 @@ -import path from "node:path"; -import { - buildDBClient, - SqliteQueue as LQ, - Runner as LQRunner, - migrateDB, -} from "liteque"; - -import type { PluginProvider } from "@karakeep/shared/plugins"; -import type { - DequeuedJob, - DequeuedJobError, - EnqueueOptions, - Queue, - QueueClient, - QueueOptions, - Runner, - RunnerFuncs, - RunnerOptions, -} from "@karakeep/shared/queueing"; -import serverConfig from "@karakeep/shared/config"; - -class LitequeQueueWrapper implements Queue { - constructor( - private readonly _name: string, - private readonly lq: LQ, - public readonly opts: QueueOptions, - ) {} - - name(): string { - return this._name; - } - - async enqueue( - payload: T, - options?: EnqueueOptions, - ): Promise { - const job = await this.lq.enqueue(payload, options); - // liteque returns a Job with numeric id - return job ? String(job.id) : undefined; - } - - async stats() { - return this.lq.stats(); - } - - async cancelAllNonRunning(): Promise { - return this.lq.cancelAllNonRunning(); - } - - // Internal accessor for runner - get _impl(): LQ { - return this.lq; - } -} - -class LitequeQueueClient implements QueueClient { - private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { - walEnabled: serverConfig.database.walMode, - }); - - private queues = new Map>(); - - async prepare(): Promise { - migrateDB(this.db); - } - - async start(): Promise { - // No-op for sqlite - } - - createQueue(name: string, options: QueueOptions): Queue { - if (this.queues.has(name)) { - throw new Error(`Queue ${name} already exists`); - } - const lq = new LQ(name, this.db, { - defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, - keepFailedJobs: options.keepFailedJobs, - }); - const wrapper = new LitequeQueueWrapper(name, lq, options); - this.queues.set(name, wrapper); - return wrapper; - } - - createRunner( - queue: Queue, - funcs: RunnerFuncs, - opts: RunnerOptions, - ): Runner { - const name = queue.name(); - let wrapper = this.queues.get(name); - if (!wrapper) { - throw new Error(`Queue ${name} not found`); - } - - const runner = new LQRunner( - wrapper._impl, - { - run: funcs.run, - onComplete: funcs.onComplete as - | ((job: DequeuedJob) => Promise) - | undefined, - onError: funcs.onError as - | ((job: DequeuedJobError) => Promise) - | undefined, - }, - { - pollIntervalMs: opts.pollIntervalMs ?? 1000, - timeoutSecs: opts.timeoutSecs, - concurrency: opts.concurrency, - validator: opts.validator, - }, - ); - - return { - run: () => runner.run(), - stop: () => runner.stop(), - runUntilEmpty: () => runner.runUntilEmpty(), - }; - } - - async shutdown(): Promise { - // No-op for sqlite - } -} - -export class LitequeQueueProvider implements PluginProvider { - private client: QueueClient | null = null; - - async getClient(): Promise { - if (!this.client) { - const client = new LitequeQueueClient(); - this.client = client; - } - return this.client; - } -} -- cgit v1.2.3-70-g09d2