aboutsummaryrefslogtreecommitdiffstats
path: root/packages/plugins-queue-liteque
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-14 18:16:40 +0000
committerMohamed Bassem <me@mbassem.com>2025-09-14 18:16:57 +0000
commit8d32055485858210252096483bb20533dc8bdf60 (patch)
treece8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /packages/plugins-queue-liteque
parent6ba61b46154e076fca47d3841b158105dbeeef80 (diff)
downloadkarakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'packages/plugins-queue-liteque')
-rw-r--r--packages/plugins-queue-liteque/.oxlintrc.json19
-rw-r--r--packages/plugins-queue-liteque/index.ts10
-rw-r--r--packages/plugins-queue-liteque/package.json27
-rw-r--r--packages/plugins-queue-liteque/src/index.ts133
-rw-r--r--packages/plugins-queue-liteque/tsconfig.json10
5 files changed, 199 insertions, 0 deletions
diff --git a/packages/plugins-queue-liteque/.oxlintrc.json b/packages/plugins-queue-liteque/.oxlintrc.json
new file mode 100644
index 00000000..79ba0255
--- /dev/null
+++ b/packages/plugins-queue-liteque/.oxlintrc.json
@@ -0,0 +1,19 @@
+{
+ "$schema": "../../node_modules/oxlint/configuration_schema.json",
+ "extends": [
+ "../../tooling/oxlint/oxlint-base.json"
+ ],
+ "env": {
+ "builtin": true,
+ "commonjs": true
+ },
+ "ignorePatterns": [
+ "**/*.config.js",
+ "**/*.config.cjs",
+ "**/.eslintrc.cjs",
+ "**/.next",
+ "**/dist",
+ "**/build",
+ "**/pnpm-lock.yaml"
+ ]
+}
diff --git a/packages/plugins-queue-liteque/index.ts b/packages/plugins-queue-liteque/index.ts
new file mode 100644
index 00000000..c3f7f03b
--- /dev/null
+++ b/packages/plugins-queue-liteque/index.ts
@@ -0,0 +1,10 @@
+// Auto-register the Liteque queue provider when this package is imported
+import { PluginManager, PluginType } from "@karakeep/shared/plugins";
+
+import { LitequeQueueProvider } from "./src";
+
+PluginManager.register({
+ type: PluginType.Queue,
+ name: "Liteque",
+ provider: new LitequeQueueProvider(),
+});
diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json
new file mode 100644
index 00000000..0a3916c1
--- /dev/null
+++ b/packages/plugins-queue-liteque/package.json
@@ -0,0 +1,27 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@karakeep/plugins-queue-liteque",
+ "version": "0.1.0",
+ "private": true,
+ "type": "module",
+ "scripts": {
+ "typecheck": "tsc --noEmit",
+ "format": "prettier . --ignore-path ../../.prettierignore",
+ "format:fix": "prettier . --write --ignore-path ../../.prettierignore",
+ "lint": "oxlint .",
+ "lint:fix": "oxlint . --fix",
+ "test": "vitest"
+ },
+ "dependencies": {
+ "@karakeep/shared": "workspace:*",
+ "liteque": "^0.6.0"
+ },
+ "devDependencies": {
+ "@karakeep/prettier-config": "workspace:^0.1.0",
+ "@karakeep/tsconfig": "workspace:^0.1.0",
+ "vite-tsconfig-paths": "^4.3.1",
+ "vitest": "^3.2.4"
+ },
+ "prettier": "@karakeep/prettier-config"
+}
+
diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts
new file mode 100644
index 00000000..3da161d8
--- /dev/null
+++ b/packages/plugins-queue-liteque/src/index.ts
@@ -0,0 +1,133 @@
+import path from "node:path";
+import {
+ buildDBClient,
+ SqliteQueue as LQ,
+ Runner as LQRunner,
+ migrateDB,
+} from "liteque";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ DequeuedJob,
+ DequeuedJobError,
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import serverConfig from "@karakeep/shared/config";
+
+class LitequeQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly lq: LQ<T>,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ const job = await this.lq.enqueue(payload, options);
+ // liteque returns a Job with numeric id
+ return job ? String(job.id) : undefined;
+ }
+
+ async stats() {
+ return this.lq.stats();
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ return this.lq.cancelAllNonRunning();
+ }
+
+ // Internal accessor for runner
+ get _impl(): LQ<T> {
+ return this.lq;
+ }
+}
+
+class LitequeQueueClient implements QueueClient {
+ private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), {
+ walEnabled: serverConfig.database.walMode,
+ });
+
+ private queues = new Map<string, LitequeQueueWrapper<unknown>>();
+
+ async init(): Promise<void> {
+ migrateDB(this.db);
+ }
+
+ createQueue<T>(name: string, options: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const lq = new LQ<T>(name, this.db, {
+ defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries },
+ keepFailedJobs: options.keepFailedJobs,
+ });
+ const wrapper = new LitequeQueueWrapper<T>(name, lq);
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.queues.get(name);
+ if (!wrapper) {
+ throw new Error(`Queue ${name} not found`);
+ }
+
+ const runner = new LQRunner<T>(
+ wrapper._impl,
+ {
+ run: funcs.run,
+ onComplete: funcs.onComplete as
+ | ((job: DequeuedJob<T>) => Promise<void>)
+ | undefined,
+ onError: funcs.onError as
+ | ((job: DequeuedJobError<T>) => Promise<void>)
+ | undefined,
+ },
+ {
+ pollIntervalMs: opts.pollIntervalMs ?? 1000,
+ timeoutSecs: opts.timeoutSecs,
+ concurrency: opts.concurrency,
+ validator: opts.validator,
+ },
+ );
+
+ return {
+ run: () => runner.run(),
+ stop: () => runner.stop(),
+ runUntilEmpty: () => runner.runUntilEmpty(),
+ };
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class LitequeQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new LitequeQueueClient();
+ await client.init();
+ this.client = client;
+ }
+ return this.client;
+ }
+}
diff --git a/packages/plugins-queue-liteque/tsconfig.json b/packages/plugins-queue-liteque/tsconfig.json
new file mode 100644
index 00000000..3bfa695c
--- /dev/null
+++ b/packages/plugins-queue-liteque/tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@karakeep/tsconfig/node.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"],
+ "compilerOptions": {
+ "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
+ }
+}
+