aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-14 18:16:40 +0000
committerMohamed Bassem <me@mbassem.com>2025-09-14 18:16:57 +0000
commit8d32055485858210252096483bb20533dc8bdf60 (patch)
treece8a1373411d1ce40aa0dbe6c37e707f0dbf4c98
parent6ba61b46154e076fca47d3841b158105dbeeef80 (diff)
downloadkarakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst
refactor: Move callsites to liteque to be behind a plugin
-rw-r--r--apps/workers/index.ts3
-rw-r--r--apps/workers/package.json1
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts70
-rw-r--r--apps/workers/workers/crawlerWorker.ts28
-rw-r--r--apps/workers/workers/feedWorker.ts11
-rw-r--r--apps/workers/workers/inference/inferenceWorker.ts10
-rw-r--r--apps/workers/workers/inference/summarize.ts4
-rw-r--r--apps/workers/workers/inference/tagging.ts14
-rw-r--r--apps/workers/workers/ruleEngineWorker.ts14
-rw-r--r--apps/workers/workers/searchWorker.ts59
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts12
-rw-r--r--apps/workers/workers/videoWorker.ts19
-rw-r--r--apps/workers/workers/webhookWorker.ts12
-rw-r--r--packages/plugins-queue-liteque/.oxlintrc.json19
-rw-r--r--packages/plugins-queue-liteque/index.ts10
-rw-r--r--packages/plugins-queue-liteque/package.json27
-rw-r--r--packages/plugins-queue-liteque/src/index.ts133
-rw-r--r--packages/plugins-queue-liteque/tsconfig.json10
-rw-r--r--packages/shared-server/package.json1
-rw-r--r--packages/shared-server/src/index.ts1
-rw-r--r--packages/shared-server/src/plugins.ts2
-rw-r--r--packages/shared-server/src/queues.ts (renamed from packages/shared/queues.ts)48
-rw-r--r--packages/shared/index.ts1
-rw-r--r--packages/shared/package.json1
-rw-r--r--packages/shared/plugins.ts28
-rw-r--r--packages/shared/queueing.ts84
-rw-r--r--packages/trpc/lib/__tests__/ruleEngine.test.ts4
-rw-r--r--packages/trpc/lib/ruleEngine.ts2
-rw-r--r--packages/trpc/models/lists.ts2
-rw-r--r--packages/trpc/models/tags.ts2
-rw-r--r--packages/trpc/package.json1
-rw-r--r--packages/trpc/routers/admin.ts2
-rw-r--r--packages/trpc/routers/bookmarks.ts20
-rw-r--r--packages/trpc/routers/feeds.ts2
-rw-r--r--packages/trpc/stats.ts2
-rw-r--r--packages/trpc/testUtils.ts33
-rw-r--r--packages/trpc/vitest.config.ts4
-rw-r--r--pnpm-lock.yaml34
38 files changed, 516 insertions, 214 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index cd860fc0..578ff6c8 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -2,10 +2,9 @@ import "dotenv/config";
import { buildServer } from "server";
-import { loadAllPlugins } from "@karakeep/shared-server";
+import { loadAllPlugins, runQueueDBMigrations } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
-import { runQueueDBMigrations } from "@karakeep/shared/queues";
import { shutdownPromise } from "./exit";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 6de801dd..030250e0 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -24,7 +24,6 @@
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.6",
"jsdom": "^24.0.0",
- "liteque": "^0.6.0",
"metascraper": "^5.46.18",
"metascraper-amazon": "^5.46.18",
"metascraper-author": "5.46.18",
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index d2df1724..2dcec59b 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -1,12 +1,11 @@
import os from "os";
import { eq } from "drizzle-orm";
-import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
-import type { AssetPreprocessingRequest } from "@karakeep/shared/queues";
+import type { AssetPreprocessingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import {
assets,
@@ -14,44 +13,53 @@ import {
bookmarkAssets,
bookmarks,
} from "@karakeep/db/schema";
-import { QuotaService, StorageQuotaError } from "@karakeep/shared-server";
-import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
AssetPreprocessingQueue,
OpenAIQueue,
+ QuotaService,
+ StorageQuotaError,
triggerSearchReindex,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import {
+ DequeuedJob,
+ EnqueueOptions,
+ getQueueClient,
+} from "@karakeep/shared/queueing";
export class AssetPreprocessingWorker {
- static build() {
+ static async build() {
logger.info("Starting asset preprocessing worker ...");
- const worker = new Runner<AssetPreprocessingRequest>(
- AssetPreprocessingQueue,
- {
- run: run,
- onComplete: async (job) => {
- workerStatsCounter.labels("assetPreprocessing", "completed").inc();
- const jobId = job.id;
- logger.info(`[assetPreprocessing][${jobId}] Completed successfully`);
- return Promise.resolve();
+ const worker =
+ (await getQueueClient())!.createRunner<AssetPreprocessingRequest>(
+ AssetPreprocessingQueue,
+ {
+ run: run,
+ onComplete: async (job) => {
+ workerStatsCounter.labels("assetPreprocessing", "completed").inc();
+ const jobId = job.id;
+ logger.info(
+ `[assetPreprocessing][${jobId}] Completed successfully`,
+ );
+ return Promise.resolve();
+ },
+ onError: async (job) => {
+ workerStatsCounter.labels("assetPreProcessing", "failed").inc();
+ const jobId = job.id;
+ logger.error(
+ `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
},
- onError: async (job) => {
- workerStatsCounter.labels("assetPreProcessing", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
+ {
+ concurrency: serverConfig.assetPreprocessing.numWorkers,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
- },
- {
- concurrency: serverConfig.assetPreprocessing.numWorkers,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
+ );
return worker;
}
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 79d8e06a..baea1346 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -10,7 +10,6 @@ import { eq } from "drizzle-orm";
import { execa } from "execa";
import { exitAbortController } from "exit";
import { JSDOM, VirtualConsole } from "jsdom";
-import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import metascraper from "metascraper";
import metascraperAmazon from "metascraper-amazon";
import metascraperAuthor from "metascraper-author";
@@ -30,7 +29,7 @@ import { fetchWithProxy } from "utils";
import { getBookmarkDetails, updateAsset } from "workerUtils";
import { z } from "zod";
-import type { ZCrawlLinkRequest } from "@karakeep/shared/queues";
+import type { ZCrawlLinkRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import {
assets,
@@ -40,7 +39,16 @@ import {
bookmarks,
users,
} from "@karakeep/db/schema";
-import { QuotaService } from "@karakeep/shared-server";
+import {
+ AssetPreprocessingQueue,
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ QuotaService,
+ triggerSearchReindex,
+ triggerWebhook,
+ VideoWorkerQueue,
+ zCrawlLinkRequestSchema,
+} from "@karakeep/shared-server";
import {
ASSET_TYPES,
getAssetSize,
@@ -55,14 +63,10 @@ import {
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import {
- AssetPreprocessingQueue,
- LinkCrawlerQueue,
- OpenAIQueue,
- triggerSearchReindex,
- triggerWebhook,
- VideoWorkerQueue,
- zCrawlLinkRequestSchema,
-} from "@karakeep/shared/queues";
+ DequeuedJob,
+ EnqueueOptions,
+ getQueueClient,
+} from "@karakeep/shared/queueing";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -247,7 +251,7 @@ export class CrawlerWorker {
}
logger.info("Starting crawler worker ...");
- const worker = new Runner<ZCrawlLinkRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZCrawlLinkRequest>(
LinkCrawlerQueue,
{
run: runCrawler,
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 2ece4890..048d9696 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -1,5 +1,4 @@
import { and, eq, inArray } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import cron from "node-cron";
import Parser from "rss-parser";
@@ -7,12 +6,12 @@ import { buildImpersonatingTRPCClient } from "trpc";
import { fetchWithProxy } from "utils";
import { z } from "zod";
-import type { ZFeedRequestSchema } from "@karakeep/shared/queues";
+import type { ZFeedRequestSchema } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema";
-import { QuotaService } from "@karakeep/shared-server";
+import { FeedQueue, QuotaService } from "@karakeep/shared-server";
import logger from "@karakeep/shared/logger";
-import { FeedQueue } from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
export const FeedRefreshingWorker = cron.schedule(
@@ -46,9 +45,9 @@ export const FeedRefreshingWorker = cron.schedule(
);
export class FeedWorker {
- static build() {
+ static async build() {
logger.info("Starting feed worker ...");
- const worker = new Runner<ZFeedRequestSchema>(
+ const worker = (await getQueueClient())!.createRunner<ZFeedRequestSchema>(
FeedQueue,
{
run: run,
diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts
index 32de3806..065462b3 100644
--- a/apps/workers/workers/inference/inferenceWorker.ts
+++ b/apps/workers/workers/inference/inferenceWorker.ts
@@ -1,14 +1,14 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
-import type { ZOpenAIRequest } from "@karakeep/shared/queues";
+import type { ZOpenAIRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
+import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import { InferenceClientFactory } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
-import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { runSummarization } from "./summarize";
import { runTagging } from "./tagging";
@@ -37,9 +37,9 @@ async function attemptMarkStatus(
}
export class OpenAiWorker {
- static build() {
+ static async build() {
logger.info("Starting inference worker ...");
- const worker = new Runner<ZOpenAIRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZOpenAIRequest>(
OpenAIQueue,
{
run: runOpenAI,
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 02a7f4e0..0b25470e 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -1,13 +1,13 @@
import { and, eq } from "drizzle-orm";
-import { DequeuedJob } from "liteque";
import { db } from "@karakeep/db";
import { bookmarks, customPrompts } from "@karakeep/db/schema";
+import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import { InferenceClient } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
import { buildSummaryPrompt } from "@karakeep/shared/prompts";
-import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues";
+import { DequeuedJob } from "@karakeep/shared/queueing";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 6dd146af..0d245644 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,13 +1,12 @@
import { and, Column, eq, inArray, sql } from "drizzle-orm";
-import { DequeuedJob, EnqueueOptions } from "liteque";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
+import type { ZOpenAIRequest } from "@karakeep/shared-server";
import type {
InferenceClient,
InferenceResponse,
} from "@karakeep/shared/inference";
-import type { ZOpenAIRequest } from "@karakeep/shared/queues";
import { db } from "@karakeep/db";
import {
bookmarks,
@@ -15,15 +14,16 @@ import {
customPrompts,
tagsOnBookmarks,
} from "@karakeep/db/schema";
-import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
-import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
import {
triggerRuleEngineOnEvent,
triggerSearchReindex,
triggerWebhook,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
+import { DequeuedJob, EnqueueOptions } from "@karakeep/shared/queueing";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
const openAIResponseSchema = z.object({
diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts
index 2a4fbb1a..37c7f595 100644
--- a/apps/workers/workers/ruleEngineWorker.ts
+++ b/apps/workers/workers/ruleEngineWorker.ts
@@ -1,23 +1,23 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { buildImpersonatingAuthedContext } from "trpc";
-import type { ZRuleEngineRequest } from "@karakeep/shared/queues";
+import type { ZRuleEngineRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
RuleEngineQueue,
zRuleEngineRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine";
export class RuleEngineWorker {
- static build() {
+ static async build() {
logger.info("Starting rule engine worker ...");
- const worker = new Runner<ZRuleEngineRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZRuleEngineRequest>(
RuleEngineQueue,
{
run: runRuleEngine,
diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index 1433ef3c..8f043c1d 100644
--- a/apps/workers/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
@@ -1,16 +1,16 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
-import type { ZSearchIndexingRequest } from "@karakeep/shared/queues";
+import type { ZSearchIndexingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
SearchIndexingQueue,
zSearchIndexingRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import {
BookmarkSearchDocument,
getSearchClient,
@@ -19,33 +19,34 @@ import {
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
export class SearchIndexingWorker {
- static build() {
+ static async build() {
logger.info("Starting search indexing worker ...");
- const worker = new Runner<ZSearchIndexingRequest>(
- SearchIndexingQueue,
- {
- run: runSearchIndexing,
- onComplete: (job) => {
- workerStatsCounter.labels("search", "completed").inc();
- const jobId = job.id;
- logger.info(`[search][${jobId}] Completed successfully`);
- return Promise.resolve();
+ const worker =
+ (await getQueueClient())!.createRunner<ZSearchIndexingRequest>(
+ SearchIndexingQueue,
+ {
+ run: runSearchIndexing,
+ onComplete: (job) => {
+ workerStatsCounter.labels("search", "completed").inc();
+ const jobId = job.id;
+ logger.info(`[search][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ workerStatsCounter.labels("search", "failed").inc();
+ const jobId = job.id;
+ logger.error(
+ `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
},
- onError: (job) => {
- workerStatsCounter.labels("search", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
+ {
+ concurrency: serverConfig.search.numWorkers,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
- },
- {
- concurrency: serverConfig.search.numWorkers,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
+ );
return worker;
}
diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
index cf3e33b6..b5b95185 100644
--- a/apps/workers/workers/tidyAssetsWorker.ts
+++ b/apps/workers/workers/tidyAssetsWorker.ts
@@ -1,21 +1,21 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { assets } from "@karakeep/db/schema";
-import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
-import logger from "@karakeep/shared/logger";
import {
TidyAssetsQueue,
ZTidyAssetsRequest,
zTidyAssetsRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
export class TidyAssetsWorker {
- static build() {
+ static async build() {
logger.info("Starting tidy assets worker ...");
- const worker = new Runner<ZTidyAssetsRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>(
TidyAssetsQueue,
{
run: runTidyAssets,
diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index 68be0126..a41eb069 100644
--- a/apps/workers/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -2,12 +2,17 @@ import fs from "fs";
import * as os from "os";
import path from "path";
import { execa } from "execa";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
-import { QuotaService, StorageQuotaError } from "@karakeep/shared-server";
+import {
+ QuotaService,
+ StorageQuotaError,
+ VideoWorkerQueue,
+ ZVideoRequest,
+ zvideoRequestSchema,
+} from "@karakeep/shared-server";
import {
ASSET_TYPES,
newAssetId,
@@ -16,21 +21,17 @@ import {
} from "@karakeep/shared/assetdb";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
-import {
- VideoWorkerQueue,
- ZVideoRequest,
- zvideoRequestSchema,
-} from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { getBookmarkDetails, updateAsset } from "../workerUtils";
const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads");
export class VideoWorker {
- static build() {
+ static async build() {
logger.info("Starting video worker ...");
- return new Runner<ZVideoRequest>(
+ return (await getQueueClient())!.createRunner<ZVideoRequest>(
VideoWorkerQueue,
{
run: runWorker,
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts
index 96070e22..2bbef160 100644
--- a/apps/workers/workers/webhookWorker.ts
+++ b/apps/workers/workers/webhookWorker.ts
@@ -1,22 +1,22 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import fetch from "node-fetch";
import { db } from "@karakeep/db";
import { bookmarks, webhooksTable } from "@karakeep/db/schema";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
WebhookQueue,
ZWebhookRequest,
zWebhookRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
export class WebhookWorker {
- static build() {
+ static async build() {
logger.info("Starting webhook worker ...");
- const worker = new Runner<ZWebhookRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZWebhookRequest>(
WebhookQueue,
{
run: runWebhook,
diff --git a/packages/plugins-queue-liteque/.oxlintrc.json b/packages/plugins-queue-liteque/.oxlintrc.json
new file mode 100644
index 00000000..79ba0255
--- /dev/null
+++ b/packages/plugins-queue-liteque/.oxlintrc.json
@@ -0,0 +1,19 @@
+{
+ "$schema": "../../node_modules/oxlint/configuration_schema.json",
+ "extends": [
+ "../../tooling/oxlint/oxlint-base.json"
+ ],
+ "env": {
+ "builtin": true,
+ "commonjs": true
+ },
+ "ignorePatterns": [
+ "**/*.config.js",
+ "**/*.config.cjs",
+ "**/.eslintrc.cjs",
+ "**/.next",
+ "**/dist",
+ "**/build",
+ "**/pnpm-lock.yaml"
+ ]
+}
diff --git a/packages/plugins-queue-liteque/index.ts b/packages/plugins-queue-liteque/index.ts
new file mode 100644
index 00000000..c3f7f03b
--- /dev/null
+++ b/packages/plugins-queue-liteque/index.ts
@@ -0,0 +1,10 @@
+// Auto-register the Liteque queue provider when this package is imported
+import { PluginManager, PluginType } from "@karakeep/shared/plugins";
+
+import { LitequeQueueProvider } from "./src";
+
+PluginManager.register({
+ type: PluginType.Queue,
+ name: "Liteque",
+ provider: new LitequeQueueProvider(),
+});
diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json
new file mode 100644
index 00000000..0a3916c1
--- /dev/null
+++ b/packages/plugins-queue-liteque/package.json
@@ -0,0 +1,27 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@karakeep/plugins-queue-liteque",
+ "version": "0.1.0",
+ "private": true,
+ "type": "module",
+ "scripts": {
+ "typecheck": "tsc --noEmit",
+ "format": "prettier . --ignore-path ../../.prettierignore",
+ "format:fix": "prettier . --write --ignore-path ../../.prettierignore",
+ "lint": "oxlint .",
+ "lint:fix": "oxlint . --fix",
+ "test": "vitest"
+ },
+ "dependencies": {
+ "@karakeep/shared": "workspace:*",
+ "liteque": "^0.6.0"
+ },
+ "devDependencies": {
+ "@karakeep/prettier-config": "workspace:^0.1.0",
+ "@karakeep/tsconfig": "workspace:^0.1.0",
+ "vite-tsconfig-paths": "^4.3.1",
+ "vitest": "^3.2.4"
+ },
+ "prettier": "@karakeep/prettier-config"
+}
+
diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts
new file mode 100644
index 00000000..3da161d8
--- /dev/null
+++ b/packages/plugins-queue-liteque/src/index.ts
@@ -0,0 +1,133 @@
+import path from "node:path";
+import {
+ buildDBClient,
+ SqliteQueue as LQ,
+ Runner as LQRunner,
+ migrateDB,
+} from "liteque";
+
+import type { PluginProvider } from "@karakeep/shared/plugins";
+import type {
+ DequeuedJob,
+ DequeuedJobError,
+ EnqueueOptions,
+ Queue,
+ QueueClient,
+ QueueOptions,
+ Runner,
+ RunnerFuncs,
+ RunnerOptions,
+} from "@karakeep/shared/queueing";
+import serverConfig from "@karakeep/shared/config";
+
+class LitequeQueueWrapper<T> implements Queue<T> {
+ constructor(
+ private readonly _name: string,
+ private readonly lq: LQ<T>,
+ ) {}
+
+ name(): string {
+ return this._name;
+ }
+
+ async enqueue(
+ payload: T,
+ options?: EnqueueOptions,
+ ): Promise<string | undefined> {
+ const job = await this.lq.enqueue(payload, options);
+ // liteque returns a Job with numeric id
+ return job ? String(job.id) : undefined;
+ }
+
+ async stats() {
+ return this.lq.stats();
+ }
+
+ async cancelAllNonRunning(): Promise<number> {
+ return this.lq.cancelAllNonRunning();
+ }
+
+ // Internal accessor for runner
+ get _impl(): LQ<T> {
+ return this.lq;
+ }
+}
+
+class LitequeQueueClient implements QueueClient {
+ private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), {
+ walEnabled: serverConfig.database.walMode,
+ });
+
+ private queues = new Map<string, LitequeQueueWrapper<unknown>>();
+
+ async init(): Promise<void> {
+ migrateDB(this.db);
+ }
+
+ createQueue<T>(name: string, options: QueueOptions): Queue<T> {
+ if (this.queues.has(name)) {
+ throw new Error(`Queue ${name} already exists`);
+ }
+ const lq = new LQ<T>(name, this.db, {
+ defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries },
+ keepFailedJobs: options.keepFailedJobs,
+ });
+ const wrapper = new LitequeQueueWrapper<T>(name, lq);
+ this.queues.set(name, wrapper);
+ return wrapper;
+ }
+
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T> {
+ const name = queue.name();
+ let wrapper = this.queues.get(name);
+ if (!wrapper) {
+ throw new Error(`Queue ${name} not found`);
+ }
+
+ const runner = new LQRunner<T>(
+ wrapper._impl,
+ {
+ run: funcs.run,
+ onComplete: funcs.onComplete as
+ | ((job: DequeuedJob<T>) => Promise<void>)
+ | undefined,
+ onError: funcs.onError as
+ | ((job: DequeuedJobError<T>) => Promise<void>)
+ | undefined,
+ },
+ {
+ pollIntervalMs: opts.pollIntervalMs ?? 1000,
+ timeoutSecs: opts.timeoutSecs,
+ concurrency: opts.concurrency,
+ validator: opts.validator,
+ },
+ );
+
+ return {
+ run: () => runner.run(),
+ stop: () => runner.stop(),
+ runUntilEmpty: () => runner.runUntilEmpty(),
+ };
+ }
+
+ async shutdown(): Promise<void> {
+ // No-op for sqlite
+ }
+}
+
+export class LitequeQueueProvider implements PluginProvider<QueueClient> {
+ private client: QueueClient | null = null;
+
+ async getClient(): Promise<QueueClient | null> {
+ if (!this.client) {
+ const client = new LitequeQueueClient();
+ await client.init();
+ this.client = client;
+ }
+ return this.client;
+ }
+}
diff --git a/packages/plugins-queue-liteque/tsconfig.json b/packages/plugins-queue-liteque/tsconfig.json
new file mode 100644
index 00000000..3bfa695c
--- /dev/null
+++ b/packages/plugins-queue-liteque/tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@karakeep/tsconfig/node.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"],
+ "compilerOptions": {
+ "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
+ }
+}
+
diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json
index 6ba6b6d9..9c1b52a8 100644
--- a/packages/shared-server/package.json
+++ b/packages/shared-server/package.json
@@ -6,6 +6,7 @@
"type": "module",
"dependencies": {
"@karakeep/db": "workspace:^0.1.0",
+ "@karakeep/plugins-queue-liteque": "workspace:^0.1.0",
"@karakeep/plugins-search-meilisearch": "workspace:^0.1.0",
"@karakeep/shared": "workspace:^0.1.0"
},
diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts
index ff3c6abc..d42118c2 100644
--- a/packages/shared-server/src/index.ts
+++ b/packages/shared-server/src/index.ts
@@ -1,2 +1,3 @@
export { loadAllPlugins } from "./plugins";
export { QuotaService, StorageQuotaError } from "./services/quotaService";
+export * from "./queues";
diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts
index 86a0b344..b6a88462 100644
--- a/packages/shared-server/src/plugins.ts
+++ b/packages/shared-server/src/plugins.ts
@@ -6,6 +6,8 @@ export async function loadAllPlugins() {
return;
}
// Load plugins here. Order of plugin loading matter.
+ // Queue provider(s)
+ await import("@karakeep/plugins-queue-liteque");
await import("@karakeep/plugins-search-meilisearch");
PluginManager.logAllPlugins();
pluginsLoaded = true;
diff --git a/packages/shared/queues.ts b/packages/shared-server/src/queues.ts
index cf8920e1..c461c7cb 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -1,18 +1,15 @@
-import path from "node:path";
-import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque";
import { z } from "zod";
-import serverConfig from "./config";
-import { zRuleEngineEventSchema } from "./types/rules";
+import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing";
+import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules";
-const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db");
+import { loadAllPlugins } from ".";
-const queueDB = buildDBClient(QUEUE_DB_PATH, {
- walEnabled: serverConfig.database.walMode,
-});
+await loadAllPlugins();
+const QUEUE_CLIENT = await getQueueClient();
export function runQueueDBMigrations() {
- migrateDB(queueDB);
+ QUEUE_CLIENT.init();
}
// Link Crawler
@@ -23,9 +20,8 @@ export const zCrawlLinkRequestSchema = z.object({
});
export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = new SqliteQueue<ZCrawlLinkRequest>(
+export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 5,
@@ -41,9 +37,8 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = new SqliteQueue<ZOpenAIRequest>(
+export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>(
"openai_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 3,
@@ -60,16 +55,13 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue = new SqliteQueue<ZSearchIndexingRequest>(
- "searching_indexing",
- queueDB,
- {
+export const SearchIndexingQueue =
+ QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", {
defaultJobArgs: {
numRetries: 5,
},
keepFailedJobs: false,
- },
-);
+ });
// Tidy Assets Worker
export const zTidyAssetsRequestSchema = z.object({
@@ -77,9 +69,8 @@ export const zTidyAssetsRequestSchema = z.object({
syncAssetMetadata: z.boolean().optional().default(false),
});
export type ZTidyAssetsRequest = z.infer<typeof zTidyAssetsRequestSchema>;
-export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>(
+export const TidyAssetsQueue = QUEUE_CLIENT.createQueue<ZTidyAssetsRequest>(
"tidy_assets_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 1,
@@ -107,9 +98,8 @@ export const zvideoRequestSchema = z.object({
});
export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;
-export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>(
+export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>(
"video_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 5,
@@ -124,9 +114,8 @@ export const zFeedRequestSchema = z.object({
});
export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>;
-export const FeedQueue = new SqliteQueue<ZFeedRequestSchema>(
+export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>(
"feed_queue",
- queueDB,
{
defaultJobArgs: {
// One retry is enough for the feed queue given that it's periodic
@@ -145,9 +134,8 @@ export type AssetPreprocessingRequest = z.infer<
typeof zAssetPreprocessingRequestSchema
>;
export const AssetPreprocessingQueue =
- new SqliteQueue<AssetPreprocessingRequest>(
+ QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>(
"asset_preprocessing_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 2,
@@ -163,9 +151,8 @@ export const zWebhookRequestSchema = z.object({
userId: z.string().optional(),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
-export const WebhookQueue = new SqliteQueue<ZWebhookRequest>(
+export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>(
"webhook_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 3,
@@ -196,9 +183,8 @@ export const zRuleEngineRequestSchema = z.object({
events: z.array(zRuleEngineEventSchema),
});
export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>;
-export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>(
+export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>(
"rule_engine_queue",
- queueDB,
{
defaultJobArgs: {
numRetries: 1,
diff --git a/packages/shared/index.ts b/packages/shared/index.ts
index e449443d..e69de29b 100644
--- a/packages/shared/index.ts
+++ b/packages/shared/index.ts
@@ -1 +0,0 @@
-export * as Queues from "./queues";
diff --git a/packages/shared/package.json b/packages/shared/package.json
index 29879868..93739354 100644
--- a/packages/shared/package.json
+++ b/packages/shared/package.json
@@ -9,7 +9,6 @@
"glob": "^11.0.0",
"html-to-text": "^9.0.5",
"js-tiktoken": "^1.0.20",
- "liteque": "^0.6.0",
"nodemailer": "^7.0.4",
"ollama": "^0.5.14",
"openai": "^4.86.1",
diff --git a/packages/shared/plugins.ts b/packages/shared/plugins.ts
index 2ce5826a..2aa7df4a 100644
--- a/packages/shared/plugins.ts
+++ b/packages/shared/plugins.ts
@@ -1,14 +1,17 @@
// Implementation inspired from Outline
+import type { QueueClient } from "./queueing";
import logger from "./logger";
import { SearchIndexClient } from "./search";
export enum PluginType {
Search = "search",
+ Queue = "queue",
}
interface PluginTypeMap {
[PluginType.Search]: SearchIndexClient;
+ [PluginType.Queue]: QueueClient;
}
export interface TPlugin<T extends PluginType> {
@@ -21,37 +24,38 @@ export interface PluginProvider<T> {
getClient(): Promise<T | null>;
}
+// Preserve the key-dependent value type: for K, store TPlugin<K>[]
+type ProviderMap = { [K in PluginType]: TPlugin<K>[] };
+
export class PluginManager {
- private static providers = new Map<PluginType, TPlugin<PluginType>[]>();
+ private static providers: ProviderMap = {
+ [PluginType.Search]: [],
+ [PluginType.Queue]: [],
+ };
static register<T extends PluginType>(plugin: TPlugin<T>): void {
- const p = PluginManager.providers.get(plugin.type);
- if (!p) {
- PluginManager.providers.set(plugin.type, [plugin]);
- return;
- }
- p.push(plugin);
+ PluginManager.providers[plugin.type].push(plugin);
}
static async getClient<T extends PluginType>(
type: T,
): Promise<PluginTypeMap[T] | null> {
- const provider = PluginManager.providers.get(type);
- if (!provider) {
+ const providers: TPlugin<T>[] = PluginManager.providers[type];
+ if (providers.length === 0) {
return null;
}
- return await provider[provider.length - 1].provider.getClient();
+ return await providers[providers.length - 1]!.provider.getClient();
}
static isRegistered<T extends PluginType>(type: T): boolean {
- return !!PluginManager.providers.get(type);
+ return PluginManager.providers[type].length > 0;
}
static logAllPlugins() {
logger.info("Plugins (Last one wins):");
for (const type of Object.values(PluginType)) {
logger.info(` ${type}:`);
- const plugins = PluginManager.providers.get(type);
+ const plugins = PluginManager.providers[type];
if (!plugins) {
logger.info(" - None");
continue;
diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts
new file mode 100644
index 00000000..dfe3b31a
--- /dev/null
+++ b/packages/shared/queueing.ts
@@ -0,0 +1,84 @@
+import { ZodType } from "zod";
+
+import { PluginManager, PluginType } from "./plugins";
+
+export interface EnqueueOptions {
+ numRetries?: number;
+ idempotencyKey?: string;
+ priority?: number;
+ delayMs?: number;
+}
+
+export interface QueueOptions {
+ defaultJobArgs: {
+ numRetries: number;
+ };
+ keepFailedJobs: boolean;
+}
+
+export interface DequeuedJob<T> {
+ id: string;
+ data: T;
+ priority: number;
+ runNumber: number;
+ abortSignal: AbortSignal;
+}
+
+export interface DequeuedJobError<T> {
+ id: string;
+ data?: T;
+ priority: number;
+ error: Error;
+ runNumber: number;
+ numRetriesLeft: number;
+}
+
+export interface RunnerFuncs<T> {
+ run: (job: DequeuedJob<T>) => Promise<void>;
+ onComplete?: (job: DequeuedJob<T>) => Promise<void>;
+ onError?: (job: DequeuedJobError<T>) => Promise<void>;
+}
+
+export interface RunnerOptions<T> {
+ pollIntervalMs?: number;
+ timeoutSecs: number;
+ concurrency: number;
+ validator?: ZodType<T>;
+}
+
+export interface Queue<T> {
+ name(): string;
+ enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>;
+ stats(): Promise<{
+ pending: number;
+ pending_retry: number;
+ running: number;
+ failed: number;
+ }>;
+ cancelAllNonRunning?(): Promise<number>;
+}
+
+export interface Runner<_T> {
+ run(): Promise<void>;
+ stop(): void;
+ runUntilEmpty?(): Promise<void>;
+}
+
+export interface QueueClient {
+ init(): Promise<void>;
+ createQueue<T>(name: string, options: QueueOptions): Queue<T>;
+ createRunner<T>(
+ queue: Queue<T>,
+ funcs: RunnerFuncs<T>,
+ opts: RunnerOptions<T>,
+ ): Runner<T>;
+ shutdown?(): Promise<void>;
+}
+
+export async function getQueueClient(): Promise<QueueClient> {
+ const client = await PluginManager.getClient(PluginType.Queue);
+ if (!client) {
+ throw new Error("Failed to get queue client");
+ }
+ return client;
+}
diff --git a/packages/trpc/lib/__tests__/ruleEngine.test.ts b/packages/trpc/lib/__tests__/ruleEngine.test.ts
index cbb4b978..a108ede7 100644
--- a/packages/trpc/lib/__tests__/ruleEngine.test.ts
+++ b/packages/trpc/lib/__tests__/ruleEngine.test.ts
@@ -15,7 +15,7 @@ import {
tagsOnBookmarks,
users,
} from "@karakeep/db/schema";
-import { LinkCrawlerQueue } from "@karakeep/shared/queues";
+import { LinkCrawlerQueue } from "@karakeep/shared-server";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import {
RuleEngineAction,
@@ -29,7 +29,7 @@ import { TestDB } from "../../testUtils";
import { RuleEngine } from "../ruleEngine";
// Mock the queue
-vi.mock("@karakeep/shared/queues", () => ({
+vi.mock("@karakeep/shared-server", () => ({
LinkCrawlerQueue: {
enqueue: vi.fn(),
},
diff --git a/packages/trpc/lib/ruleEngine.ts b/packages/trpc/lib/ruleEngine.ts
index 0bef8cdc..2d5deae6 100644
--- a/packages/trpc/lib/ruleEngine.ts
+++ b/packages/trpc/lib/ruleEngine.ts
@@ -2,7 +2,7 @@ import deepEql from "deep-equal";
import { and, eq } from "drizzle-orm";
import { bookmarks, tagsOnBookmarks } from "@karakeep/db/schema";
-import { LinkCrawlerQueue } from "@karakeep/shared/queues";
+import { LinkCrawlerQueue } from "@karakeep/shared-server";
import {
RuleEngineAction,
RuleEngineCondition,
diff --git a/packages/trpc/models/lists.ts b/packages/trpc/models/lists.ts
index 39d78ac1..c0e17bfc 100644
--- a/packages/trpc/models/lists.ts
+++ b/packages/trpc/models/lists.ts
@@ -6,7 +6,7 @@ import { z } from "zod";
import { SqliteError } from "@karakeep/db";
import { bookmarkLists, bookmarksInLists } from "@karakeep/db/schema";
-import { triggerRuleEngineOnEvent } from "@karakeep/shared/queues";
+import { triggerRuleEngineOnEvent } from "@karakeep/shared-server";
import { parseSearchQuery } from "@karakeep/shared/searchQueryParser";
import { ZSortOrder } from "@karakeep/shared/types/bookmarks";
import {
diff --git a/packages/trpc/models/tags.ts b/packages/trpc/models/tags.ts
index 79cd855b..a91dcbdf 100644
--- a/packages/trpc/models/tags.ts
+++ b/packages/trpc/models/tags.ts
@@ -5,7 +5,7 @@ import { z } from "zod";
import type { ZAttachedByEnum } from "@karakeep/shared/types/tags";
import { SqliteError } from "@karakeep/db";
import { bookmarkTags, tagsOnBookmarks } from "@karakeep/db/schema";
-import { triggerSearchReindex } from "@karakeep/shared/queues";
+import { triggerSearchReindex } from "@karakeep/shared-server";
import {
zCreateTagRequestSchema,
zGetTagResponseSchema,
diff --git a/packages/trpc/package.json b/packages/trpc/package.json
index c4e16675..d1896a0b 100644
--- a/packages/trpc/package.json
+++ b/packages/trpc/package.json
@@ -20,7 +20,6 @@
"bcryptjs": "^2.4.3",
"deep-equal": "^2.2.3",
"drizzle-orm": "^0.44.2",
- "liteque": "^0.6.0",
"nodemailer": "^7.0.4",
"prom-client": "^15.1.3",
"stripe": "^18.3.0",
diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts
index e005c3dd..25425eaf 100644
--- a/packages/trpc/routers/admin.ts
+++ b/packages/trpc/routers/admin.ts
@@ -13,7 +13,7 @@ import {
triggerSearchReindex,
VideoWorkerQueue,
WebhookQueue,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
import { getSearchClient } from "@karakeep/shared/search";
import {
resetPasswordSchema,
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index efd295f7..3399bf19 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -1,6 +1,5 @@
import { experimental_trpcMiddleware, TRPCError } from "@trpc/server";
import { and, eq, gt, inArray, lt, or } from "drizzle-orm";
-import { EnqueueOptions } from "liteque";
import invariant from "tiny-invariant";
import { z } from "zod";
@@ -21,23 +20,24 @@ import {
customPrompts,
tagsOnBookmarks,
} from "@karakeep/db/schema";
-import { QuotaService } from "@karakeep/shared-server";
-import {
- deleteAsset,
- SUPPORTED_BOOKMARK_ASSET_TYPES,
-} from "@karakeep/shared/assetdb";
-import serverConfig from "@karakeep/shared/config";
-import { InferenceClientFactory } from "@karakeep/shared/inference";
-import { buildSummaryPrompt } from "@karakeep/shared/prompts";
import {
AssetPreprocessingQueue,
LinkCrawlerQueue,
OpenAIQueue,
+ QuotaService,
SearchIndexingQueue,
triggerRuleEngineOnEvent,
triggerSearchReindex,
triggerWebhook,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import {
+ deleteAsset,
+ SUPPORTED_BOOKMARK_ASSET_TYPES,
+} from "@karakeep/shared/assetdb";
+import serverConfig from "@karakeep/shared/config";
+import { InferenceClientFactory } from "@karakeep/shared/inference";
+import { buildSummaryPrompt } from "@karakeep/shared/prompts";
+import { EnqueueOptions } from "@karakeep/shared/queueing";
import { FilterQuery, getSearchClient } from "@karakeep/shared/search";
import { parseSearchQuery } from "@karakeep/shared/searchQueryParser";
import {
diff --git a/packages/trpc/routers/feeds.ts b/packages/trpc/routers/feeds.ts
index 27eefdf1..57c88084 100644
--- a/packages/trpc/routers/feeds.ts
+++ b/packages/trpc/routers/feeds.ts
@@ -1,6 +1,6 @@
import { z } from "zod";
-import { FeedQueue } from "@karakeep/shared/queues";
+import { FeedQueue } from "@karakeep/shared-server";
import {
zFeedSchema,
zNewFeedSchema,
diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts
index 9aef42ef..c6d5c94c 100644
--- a/packages/trpc/stats.ts
+++ b/packages/trpc/stats.ts
@@ -13,7 +13,7 @@ import {
TidyAssetsQueue,
VideoWorkerQueue,
WebhookQueue,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
// Queue metrics
const queuePendingJobsGauge = new Gauge({
diff --git a/packages/trpc/testUtils.ts b/packages/trpc/testUtils.ts
index 1cc4e727..b8fe3c30 100644
--- a/packages/trpc/testUtils.ts
+++ b/packages/trpc/testUtils.ts
@@ -77,20 +77,25 @@ export async function buildTestContext(
export function defaultBeforeEach(seedDB = true) {
return async (context: object) => {
- vi.mock("@karakeep/shared/queues", () => ({
- LinkCrawlerQueue: {
- enqueue: vi.fn(),
- },
- OpenAIQueue: {
- enqueue: vi.fn(),
- },
- SearchIndexingQueue: {
- enqueue: vi.fn(),
- },
- triggerRuleEngineOnEvent: vi.fn(),
- triggerSearchReindex: vi.fn(),
- triggerWebhook: vi.fn(),
- }));
+ vi.mock("@karakeep/shared-server", async (original) => {
+ const mod =
+ (await original()) as typeof import("@karakeep/shared-server");
+ return {
+ ...mod,
+ LinkCrawlerQueue: {
+ enqueue: vi.fn(),
+ },
+ OpenAIQueue: {
+ enqueue: vi.fn(),
+ },
+ SearchIndexingQueue: {
+ enqueue: vi.fn(),
+ },
+ triggerRuleEngineOnEvent: vi.fn(),
+ triggerSearchReindex: vi.fn(),
+ triggerWebhook: vi.fn(),
+ };
+ });
Object.assign(context, await buildTestContext(seedDB));
};
}
diff --git a/packages/trpc/vitest.config.ts b/packages/trpc/vitest.config.ts
index 5af4ad16..41fd70c4 100644
--- a/packages/trpc/vitest.config.ts
+++ b/packages/trpc/vitest.config.ts
@@ -10,9 +10,5 @@ export default defineConfig({
alias: {
"@/*": "./*",
},
- deps: {
- // TODO: this need to be fixed
- inline: ["liteque"],
- },
},
});
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 55040814..dc31796c 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -832,9 +832,6 @@ importers:
jsdom:
specifier: ^24.0.0
version: 24.1.3
- liteque:
- specifier: ^0.6.0
- version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
metascraper:
specifier: ^5.46.18
version: 5.47.1
@@ -1139,6 +1136,28 @@ importers:
specifier: ^4.8.1
version: 4.20.3
+ packages/plugins-queue-liteque:
+ dependencies:
+ '@karakeep/shared':
+ specifier: workspace:*
+ version: link:../shared
+ liteque:
+ specifier: ^0.6.0
+ version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
+ devDependencies:
+ '@karakeep/prettier-config':
+ specifier: workspace:^0.1.0
+ version: link:../../tooling/prettier
+ '@karakeep/tsconfig':
+ specifier: workspace:^0.1.0
+ version: link:../../tooling/typescript
+ vite-tsconfig-paths:
+ specifier: ^4.3.1
+ version: 4.3.2(typescript@5.8.3)(vite@7.0.6(@types/node@22.15.30)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0))
+ vitest:
+ specifier: ^3.2.4
+ version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.30)(happy-dom@17.4.9)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0)
+
packages/plugins-search-meilisearch:
dependencies:
'@karakeep/shared':
@@ -1203,9 +1222,6 @@ importers:
js-tiktoken:
specifier: ^1.0.20
version: 1.0.20
- liteque:
- specifier: ^0.6.0
- version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
nodemailer:
specifier: ^7.0.4
version: 7.0.4
@@ -1283,6 +1299,9 @@ importers:
'@karakeep/db':
specifier: workspace:^0.1.0
version: link:../db
+ '@karakeep/plugins-queue-liteque':
+ specifier: workspace:^0.1.0
+ version: link:../plugins-queue-liteque
'@karakeep/plugins-search-meilisearch':
specifier: workspace:^0.1.0
version: link:../plugins-search-meilisearch
@@ -1320,9 +1339,6 @@ importers:
drizzle-orm:
specifier: ^0.44.2
version: 0.44.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@11.3.0)(gel@2.1.0)(kysely@0.28.5)
- liteque:
- specifier: ^0.6.0
- version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0)
nodemailer:
specifier: ^7.0.4
version: 7.0.4