diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:40 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-09-14 18:16:57 +0000 |
| commit | 8d32055485858210252096483bb20533dc8bdf60 (patch) | |
| tree | ce8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/plugins-queue-liteque | |
| parent | 6ba61b46154e076fca47d3841b158105dbeeef80 (diff) | |
| download | karakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst | |
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'packages/plugins-queue-liteque')
| -rw-r--r-- | packages/plugins-queue-liteque/.oxlintrc.json | 19 | ||||
| -rw-r--r-- | packages/plugins-queue-liteque/index.ts | 10 | ||||
| -rw-r--r-- | packages/plugins-queue-liteque/package.json | 27 | ||||
| -rw-r--r-- | packages/plugins-queue-liteque/src/index.ts | 133 | ||||
| -rw-r--r-- | packages/plugins-queue-liteque/tsconfig.json | 10 |
5 files changed, 199 insertions, 0 deletions
diff --git a/packages/plugins-queue-liteque/.oxlintrc.json b/packages/plugins-queue-liteque/.oxlintrc.json new file mode 100644 index 00000000..79ba0255 --- /dev/null +++ b/packages/plugins-queue-liteque/.oxlintrc.json @@ -0,0 +1,19 @@ +{ + "$schema": "../../node_modules/oxlint/configuration_schema.json", + "extends": [ + "../../tooling/oxlint/oxlint-base.json" + ], + "env": { + "builtin": true, + "commonjs": true + }, + "ignorePatterns": [ + "**/*.config.js", + "**/*.config.cjs", + "**/.eslintrc.cjs", + "**/.next", + "**/dist", + "**/build", + "**/pnpm-lock.yaml" + ] +} diff --git a/packages/plugins-queue-liteque/index.ts b/packages/plugins-queue-liteque/index.ts new file mode 100644 index 00000000..c3f7f03b --- /dev/null +++ b/packages/plugins-queue-liteque/index.ts @@ -0,0 +1,10 @@ +// Auto-register the Liteque queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { LitequeQueueProvider } from "./src"; + +PluginManager.register({ + type: PluginType.Queue, + name: "Liteque", + provider: new LitequeQueueProvider(), +}); diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json new file mode 100644 index 00000000..0a3916c1 --- /dev/null +++ b/packages/plugins-queue-liteque/package.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@karakeep/plugins-queue-liteque", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "typecheck": "tsc --noEmit", + "format": "prettier . --ignore-path ../../.prettierignore", + "format:fix": "prettier . --write --ignore-path ../../.prettierignore", + "lint": "oxlint .", + "lint:fix": "oxlint . --fix", + "test": "vitest" + }, + "dependencies": { + "@karakeep/shared": "workspace:*", + "liteque": "^0.6.0" + }, + "devDependencies": { + "@karakeep/prettier-config": "workspace:^0.1.0", + "@karakeep/tsconfig": "workspace:^0.1.0", + "vite-tsconfig-paths": "^4.3.1", + "vitest": "^3.2.4" + }, + "prettier": "@karakeep/prettier-config" +} + diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts new file mode 100644 index 00000000..3da161d8 --- /dev/null +++ b/packages/plugins-queue-liteque/src/index.ts @@ -0,0 +1,133 @@ +import path from "node:path"; +import { + buildDBClient, + SqliteQueue as LQ, + Runner as LQRunner, + migrateDB, +} from "liteque"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + DequeuedJob, + DequeuedJobError, + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import serverConfig from "@karakeep/shared/config"; + +class LitequeQueueWrapper<T> implements Queue<T> { + constructor( + private readonly _name: string, + private readonly lq: LQ<T>, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise<string | undefined> { + const job = await this.lq.enqueue(payload, options); + // liteque returns a Job with numeric id + return job ? String(job.id) : undefined; + } + + async stats() { + return this.lq.stats(); + } + + async cancelAllNonRunning(): Promise<number> { + return this.lq.cancelAllNonRunning(); + } + + // Internal accessor for runner + get _impl(): LQ<T> { + return this.lq; + } +} + +class LitequeQueueClient implements QueueClient { + private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { + walEnabled: serverConfig.database.walMode, + }); + + private queues = new Map<string, LitequeQueueWrapper<unknown>>(); + + async init(): Promise<void> { + migrateDB(this.db); + } + + createQueue<T>(name: string, options: QueueOptions): Queue<T> { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const lq = new LQ<T>(name, this.db, { + defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, + keepFailedJobs: options.keepFailedJobs, + }); + const wrapper = new LitequeQueueWrapper<T>(name, lq); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner<T>( + queue: Queue<T>, + funcs: RunnerFuncs<T>, + opts: RunnerOptions<T>, + ): Runner<T> { + const name = queue.name(); + let wrapper = this.queues.get(name); + if (!wrapper) { + throw new Error(`Queue ${name} not found`); + } + + const runner = new LQRunner<T>( + wrapper._impl, + { + run: funcs.run, + onComplete: funcs.onComplete as + | ((job: DequeuedJob<T>) => Promise<void>) + | undefined, + onError: funcs.onError as + | ((job: DequeuedJobError<T>) => Promise<void>) + | undefined, + }, + { + pollIntervalMs: opts.pollIntervalMs ?? 1000, + timeoutSecs: opts.timeoutSecs, + concurrency: opts.concurrency, + validator: opts.validator, + }, + ); + + return { + run: () => runner.run(), + stop: () => runner.stop(), + runUntilEmpty: () => runner.runUntilEmpty(), + }; + } + + async shutdown(): Promise<void> { + // No-op for sqlite + } +} + +export class LitequeQueueProvider implements PluginProvider<QueueClient> { + private client: QueueClient | null = null; + + async getClient(): Promise<QueueClient | null> { + if (!this.client) { + const client = new LitequeQueueClient(); + await client.init(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins-queue-liteque/tsconfig.json b/packages/plugins-queue-liteque/tsconfig.json new file mode 100644 index 00000000..3bfa695c --- /dev/null +++ b/packages/plugins-queue-liteque/tsconfig.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@karakeep/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + } +} + |
