import path from "node:path"; import { buildDBClient, SqliteQueue as LQ, Runner as LQRunner, migrateDB, } from "liteque"; import type { PluginProvider } from "@karakeep/shared/plugins"; import type { EnqueueOptions, Queue, QueueClient, QueueOptions, Runner, RunnerFuncs, RunnerOptions, } from "@karakeep/shared/queueing"; import serverConfig from "@karakeep/shared/config"; class LitequeQueueWrapper implements Queue { constructor( private readonly _name: string, private readonly lq: LQ, public readonly opts: QueueOptions, ) {} name(): string { return this._name; } async enqueue( payload: T, options?: EnqueueOptions, ): Promise { const job = await this.lq.enqueue(payload, options); // liteque returns a Job with numeric id return job ? String(job.id) : undefined; } async stats() { return this.lq.stats(); } async cancelAllNonRunning(): Promise { return this.lq.cancelAllNonRunning(); } // Internal accessor for runner get _impl(): LQ { return this.lq; } } class LitequeQueueClient implements QueueClient { private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { walEnabled: serverConfig.database.walMode, }); private queues = new Map>(); async prepare(): Promise { migrateDB(this.db); } async start(): Promise { // No-op for sqlite } createQueue(name: string, options: QueueOptions): Queue { if (this.queues.has(name)) { throw new Error(`Queue ${name} already exists`); } const lq = new LQ(name, this.db, { defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, keepFailedJobs: options.keepFailedJobs, }); const wrapper = new LitequeQueueWrapper(name, lq, options); this.queues.set(name, wrapper); return wrapper; } createRunner( queue: Queue, funcs: RunnerFuncs, opts: RunnerOptions, ): Runner { const name = queue.name(); let wrapper = this.queues.get(name); if (!wrapper) { throw new Error(`Queue ${name} not found`); } const runner = new LQRunner( wrapper._impl, { run: funcs.run, onComplete: funcs.onComplete, onError: funcs.onError, }, { pollIntervalMs: opts.pollIntervalMs ?? 1000, timeoutSecs: opts.timeoutSecs, concurrency: opts.concurrency, validator: opts.validator, }, ); return { run: () => runner.run(), stop: () => runner.stop(), runUntilEmpty: () => runner.runUntilEmpty(), }; } async shutdown(): Promise { // No-op for sqlite } } export class LitequeQueueProvider implements PluginProvider { private client: QueueClient | null = null; async getClient(): Promise { if (!this.client) { const client = new LitequeQueueClient(); this.client = client; } return this.client; } }