From 8d32055485858210252096483bb20533dc8bdf60 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 14 Sep 2025 18:16:40 +0000 Subject: refactor: Move callsites to liteque to be behind a plugin --- apps/workers/index.ts | 3 +- apps/workers/package.json | 1 - apps/workers/workers/assetPreprocessingWorker.ts | 70 ++++--- apps/workers/workers/crawlerWorker.ts | 28 +-- apps/workers/workers/feedWorker.ts | 11 +- apps/workers/workers/inference/inferenceWorker.ts | 10 +- apps/workers/workers/inference/summarize.ts | 4 +- apps/workers/workers/inference/tagging.ts | 14 +- apps/workers/workers/ruleEngineWorker.ts | 14 +- apps/workers/workers/searchWorker.ts | 59 +++--- apps/workers/workers/tidyAssetsWorker.ts | 12 +- apps/workers/workers/videoWorker.ts | 19 +- apps/workers/workers/webhookWorker.ts | 12 +- packages/plugins-queue-liteque/.oxlintrc.json | 19 ++ packages/plugins-queue-liteque/index.ts | 10 + packages/plugins-queue-liteque/package.json | 27 +++ packages/plugins-queue-liteque/src/index.ts | 133 +++++++++++++ packages/plugins-queue-liteque/tsconfig.json | 10 + packages/shared-server/package.json | 1 + packages/shared-server/src/index.ts | 1 + packages/shared-server/src/plugins.ts | 2 + packages/shared-server/src/queues.ts | 208 ++++++++++++++++++++ packages/shared/index.ts | 1 - packages/shared/package.json | 1 - packages/shared/plugins.ts | 28 +-- packages/shared/queueing.ts | 84 ++++++++ packages/shared/queues.ts | 222 ---------------------- packages/trpc/lib/__tests__/ruleEngine.test.ts | 4 +- packages/trpc/lib/ruleEngine.ts | 2 +- packages/trpc/models/lists.ts | 2 +- packages/trpc/models/tags.ts | 2 +- packages/trpc/package.json | 1 - packages/trpc/routers/admin.ts | 2 +- packages/trpc/routers/bookmarks.ts | 20 +- packages/trpc/routers/feeds.ts | 2 +- packages/trpc/stats.ts | 2 +- packages/trpc/testUtils.ts | 33 ++-- packages/trpc/vitest.config.ts | 4 - pnpm-lock.yaml | 34 +++- 39 files changed, 707 insertions(+), 405 deletions(-) create mode 100644 packages/plugins-queue-liteque/.oxlintrc.json create mode 100644 packages/plugins-queue-liteque/index.ts create mode 100644 packages/plugins-queue-liteque/package.json create mode 100644 packages/plugins-queue-liteque/src/index.ts create mode 100644 packages/plugins-queue-liteque/tsconfig.json create mode 100644 packages/shared-server/src/queues.ts create mode 100644 packages/shared/queueing.ts delete mode 100644 packages/shared/queues.ts diff --git a/apps/workers/index.ts b/apps/workers/index.ts index cd860fc0..578ff6c8 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -2,10 +2,9 @@ import "dotenv/config"; import { buildServer } from "server"; -import { loadAllPlugins } from "@karakeep/shared-server"; +import { loadAllPlugins, runQueueDBMigrations } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; -import { runQueueDBMigrations } from "@karakeep/shared/queues"; import { shutdownPromise } from "./exit"; import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; diff --git a/apps/workers/package.json b/apps/workers/package.json index 6de801dd..030250e0 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -24,7 +24,6 @@ "http-proxy-agent": "^7.0.2", "https-proxy-agent": "^7.0.6", "jsdom": "^24.0.0", - "liteque": "^0.6.0", "metascraper": "^5.46.18", "metascraper-amazon": "^5.46.18", "metascraper-author": "5.46.18", diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index d2df1724..2dcec59b 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -1,12 +1,11 @@ import os from "os"; import { eq } from "drizzle-orm"; -import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; -import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; +import type { AssetPreprocessingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { assets, @@ -14,44 +13,53 @@ import { bookmarkAssets, bookmarks, } from "@karakeep/db/schema"; -import { QuotaService, StorageQuotaError } from "@karakeep/shared-server"; -import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { AssetPreprocessingQueue, OpenAIQueue, + QuotaService, + StorageQuotaError, triggerSearchReindex, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + DequeuedJob, + EnqueueOptions, + getQueueClient, +} from "@karakeep/shared/queueing"; export class AssetPreprocessingWorker { - static build() { + static async build() { logger.info("Starting asset preprocessing worker ..."); - const worker = new Runner( - AssetPreprocessingQueue, - { - run: run, - onComplete: async (job) => { - workerStatsCounter.labels("assetPreprocessing", "completed").inc(); - const jobId = job.id; - logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); - return Promise.resolve(); + const worker = + (await getQueueClient())!.createRunner( + AssetPreprocessingQueue, + { + run: run, + onComplete: async (job) => { + workerStatsCounter.labels("assetPreprocessing", "completed").inc(); + const jobId = job.id; + logger.info( + `[assetPreprocessing][${jobId}] Completed successfully`, + ); + return Promise.resolve(); + }, + onError: async (job) => { + workerStatsCounter.labels("assetPreProcessing", "failed").inc(); + const jobId = job.id; + logger.error( + `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, }, - onError: async (job) => { - workerStatsCounter.labels("assetPreProcessing", "failed").inc(); - const jobId = job.id; - logger.error( - `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); + { + concurrency: serverConfig.assetPreprocessing.numWorkers, + pollIntervalMs: 1000, + timeoutSecs: 30, }, - }, - { - concurrency: serverConfig.assetPreprocessing.numWorkers, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); + ); return worker; } diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 79d8e06a..baea1346 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -10,7 +10,6 @@ import { eq } from "drizzle-orm"; import { execa } from "execa"; import { exitAbortController } from "exit"; import { JSDOM, VirtualConsole } from "jsdom"; -import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import metascraper from "metascraper"; import metascraperAmazon from "metascraper-amazon"; import metascraperAuthor from "metascraper-author"; @@ -30,7 +29,7 @@ import { fetchWithProxy } from "utils"; import { getBookmarkDetails, updateAsset } from "workerUtils"; import { z } from "zod"; -import type { ZCrawlLinkRequest } from "@karakeep/shared/queues"; +import type { ZCrawlLinkRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { assets, @@ -40,7 +39,16 @@ import { bookmarks, users, } from "@karakeep/db/schema"; -import { QuotaService } from "@karakeep/shared-server"; +import { + AssetPreprocessingQueue, + LinkCrawlerQueue, + OpenAIQueue, + QuotaService, + triggerSearchReindex, + triggerWebhook, + VideoWorkerQueue, + zCrawlLinkRequestSchema, +} from "@karakeep/shared-server"; import { ASSET_TYPES, getAssetSize, @@ -55,14 +63,10 @@ import { import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { - AssetPreprocessingQueue, - LinkCrawlerQueue, - OpenAIQueue, - triggerSearchReindex, - triggerWebhook, - VideoWorkerQueue, - zCrawlLinkRequestSchema, -} from "@karakeep/shared/queues"; + DequeuedJob, + EnqueueOptions, + getQueueClient, +} from "@karakeep/shared/queueing"; import { tryCatch } from "@karakeep/shared/tryCatch"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -247,7 +251,7 @@ export class CrawlerWorker { } logger.info("Starting crawler worker ..."); - const worker = new Runner( + const worker = (await getQueueClient())!.createRunner( LinkCrawlerQueue, { run: runCrawler, diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 2ece4890..048d9696 100644 --- a/apps/workers/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts @@ -1,5 +1,4 @@ import { and, eq, inArray } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import cron from "node-cron"; import Parser from "rss-parser"; @@ -7,12 +6,12 @@ import { buildImpersonatingTRPCClient } from "trpc"; import { fetchWithProxy } from "utils"; import { z } from "zod"; -import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; +import type { ZFeedRequestSchema } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; -import { QuotaService } from "@karakeep/shared-server"; +import { FeedQueue, QuotaService } from "@karakeep/shared-server"; import logger from "@karakeep/shared/logger"; -import { FeedQueue } from "@karakeep/shared/queues"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; export const FeedRefreshingWorker = cron.schedule( @@ -46,9 +45,9 @@ export const FeedRefreshingWorker = cron.schedule( ); export class FeedWorker { - static build() { + static async build() { logger.info("Starting feed worker ..."); - const worker = new Runner( + const worker = (await getQueueClient())!.createRunner( FeedQueue, { run: run, diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts index 32de3806..065462b3 100644 --- a/apps/workers/workers/inference/inferenceWorker.ts +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -1,14 +1,14 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; -import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import type { ZOpenAIRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; +import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { InferenceClientFactory } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; -import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { runSummarization } from "./summarize"; import { runTagging } from "./tagging"; @@ -37,9 +37,9 @@ async function attemptMarkStatus( } export class OpenAiWorker { - static build() { + static async build() { logger.info("Starting inference worker ..."); - const worker = new Runner( + const worker = (await getQueueClient())!.createRunner( OpenAIQueue, { run: runOpenAI, diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 02a7f4e0..0b25470e 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -1,13 +1,13 @@ import { and, eq } from "drizzle-orm"; -import { DequeuedJob } from "liteque"; import { db } from "@karakeep/db"; import { bookmarks, customPrompts } from "@karakeep/db/schema"; +import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { InferenceClient } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; import { buildSummaryPrompt } from "@karakeep/shared/prompts"; -import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues"; +import { DequeuedJob } from "@karakeep/shared/queueing"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 6dd146af..0d245644 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,13 +1,12 @@ import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob, EnqueueOptions } from "liteque"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; +import type { ZOpenAIRequest } from "@karakeep/shared-server"; import type { InferenceClient, InferenceResponse, } from "@karakeep/shared/inference"; -import type { ZOpenAIRequest } from "@karakeep/shared/queues"; import { db } from "@karakeep/db"; import { bookmarks, @@ -15,15 +14,16 @@ import { customPrompts, tagsOnBookmarks, } from "@karakeep/db/schema"; -import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; import { triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; +import { DequeuedJob, EnqueueOptions } from "@karakeep/shared/queueing"; import { Bookmark } from "@karakeep/trpc/models/bookmarks"; const openAIResponseSchema = z.object({ diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 2a4fbb1a..37c7f595 100644 --- a/apps/workers/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -1,23 +1,23 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import { buildImpersonatingAuthedContext } from "trpc"; -import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; +import type { ZRuleEngineRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { RuleEngineQueue, zRuleEngineRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; export class RuleEngineWorker { - static build() { + static async build() { logger.info("Starting rule engine worker ..."); - const worker = new Runner( + const worker = (await getQueueClient())!.createRunner( RuleEngineQueue, { run: runRuleEngine, diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index 1433ef3c..8f043c1d 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -1,16 +1,16 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; -import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; +import type { ZSearchIndexingRequest } from "@karakeep/shared-server"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { SearchIndexingQueue, zSearchIndexingRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { BookmarkSearchDocument, getSearchClient, @@ -19,33 +19,34 @@ import { import { Bookmark } from "@karakeep/trpc/models/bookmarks"; export class SearchIndexingWorker { - static build() { + static async build() { logger.info("Starting search indexing worker ..."); - const worker = new Runner( - SearchIndexingQueue, - { - run: runSearchIndexing, - onComplete: (job) => { - workerStatsCounter.labels("search", "completed").inc(); - const jobId = job.id; - logger.info(`[search][${jobId}] Completed successfully`); - return Promise.resolve(); + const worker = + (await getQueueClient())!.createRunner( + SearchIndexingQueue, + { + run: runSearchIndexing, + onComplete: (job) => { + workerStatsCounter.labels("search", "completed").inc(); + const jobId = job.id; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + workerStatsCounter.labels("search", "failed").inc(); + const jobId = job.id; + logger.error( + `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, }, - onError: (job) => { - workerStatsCounter.labels("search", "failed").inc(); - const jobId = job.id; - logger.error( - `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); + { + concurrency: serverConfig.search.numWorkers, + pollIntervalMs: 1000, + timeoutSecs: 30, }, - }, - { - concurrency: serverConfig.search.numWorkers, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); + ); return worker; } diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts index cf3e33b6..b5b95185 100644 --- a/apps/workers/workers/tidyAssetsWorker.ts +++ b/apps/workers/workers/tidyAssetsWorker.ts @@ -1,21 +1,21 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import { db } from "@karakeep/db"; import { assets } from "@karakeep/db/schema"; -import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; -import logger from "@karakeep/shared/logger"; import { TidyAssetsQueue, ZTidyAssetsRequest, zTidyAssetsRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; export class TidyAssetsWorker { - static build() { + static async build() { logger.info("Starting tidy assets worker ..."); - const worker = new Runner( + const worker = (await getQueueClient())!.createRunner( TidyAssetsQueue, { run: runTidyAssets, diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index 68be0126..a41eb069 100644 --- a/apps/workers/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -2,12 +2,17 @@ import fs from "fs"; import * as os from "os"; import path from "path"; import { execa } from "execa"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import { db } from "@karakeep/db"; import { AssetTypes } from "@karakeep/db/schema"; -import { QuotaService, StorageQuotaError } from "@karakeep/shared-server"; +import { + QuotaService, + StorageQuotaError, + VideoWorkerQueue, + ZVideoRequest, + zvideoRequestSchema, +} from "@karakeep/shared-server"; import { ASSET_TYPES, newAssetId, @@ -16,21 +21,17 @@ import { } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; -import { - VideoWorkerQueue, - ZVideoRequest, - zvideoRequestSchema, -} from "@karakeep/shared/queues"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; import { getBookmarkDetails, updateAsset } from "../workerUtils"; const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads"); export class VideoWorker { - static build() { + static async build() { logger.info("Starting video worker ..."); - return new Runner( + return (await getQueueClient())!.createRunner( VideoWorkerQueue, { run: runWorker, diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 96070e22..2bbef160 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -1,22 +1,22 @@ import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; import { workerStatsCounter } from "metrics"; import fetch from "node-fetch"; import { db } from "@karakeep/db"; import { bookmarks, webhooksTable } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; import { WebhookQueue, ZWebhookRequest, zWebhookRequestSchema, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; export class WebhookWorker { - static build() { + static async build() { logger.info("Starting webhook worker ..."); - const worker = new Runner( + const worker = (await getQueueClient())!.createRunner( WebhookQueue, { run: runWebhook, diff --git a/packages/plugins-queue-liteque/.oxlintrc.json b/packages/plugins-queue-liteque/.oxlintrc.json new file mode 100644 index 00000000..79ba0255 --- /dev/null +++ b/packages/plugins-queue-liteque/.oxlintrc.json @@ -0,0 +1,19 @@ +{ + "$schema": "../../node_modules/oxlint/configuration_schema.json", + "extends": [ + "../../tooling/oxlint/oxlint-base.json" + ], + "env": { + "builtin": true, + "commonjs": true + }, + "ignorePatterns": [ + "**/*.config.js", + "**/*.config.cjs", + "**/.eslintrc.cjs", + "**/.next", + "**/dist", + "**/build", + "**/pnpm-lock.yaml" + ] +} diff --git a/packages/plugins-queue-liteque/index.ts b/packages/plugins-queue-liteque/index.ts new file mode 100644 index 00000000..c3f7f03b --- /dev/null +++ b/packages/plugins-queue-liteque/index.ts @@ -0,0 +1,10 @@ +// Auto-register the Liteque queue provider when this package is imported +import { PluginManager, PluginType } from "@karakeep/shared/plugins"; + +import { LitequeQueueProvider } from "./src"; + +PluginManager.register({ + type: PluginType.Queue, + name: "Liteque", + provider: new LitequeQueueProvider(), +}); diff --git a/packages/plugins-queue-liteque/package.json b/packages/plugins-queue-liteque/package.json new file mode 100644 index 00000000..0a3916c1 --- /dev/null +++ b/packages/plugins-queue-liteque/package.json @@ -0,0 +1,27 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@karakeep/plugins-queue-liteque", + "version": "0.1.0", + "private": true, + "type": "module", + "scripts": { + "typecheck": "tsc --noEmit", + "format": "prettier . --ignore-path ../../.prettierignore", + "format:fix": "prettier . --write --ignore-path ../../.prettierignore", + "lint": "oxlint .", + "lint:fix": "oxlint . --fix", + "test": "vitest" + }, + "dependencies": { + "@karakeep/shared": "workspace:*", + "liteque": "^0.6.0" + }, + "devDependencies": { + "@karakeep/prettier-config": "workspace:^0.1.0", + "@karakeep/tsconfig": "workspace:^0.1.0", + "vite-tsconfig-paths": "^4.3.1", + "vitest": "^3.2.4" + }, + "prettier": "@karakeep/prettier-config" +} + diff --git a/packages/plugins-queue-liteque/src/index.ts b/packages/plugins-queue-liteque/src/index.ts new file mode 100644 index 00000000..3da161d8 --- /dev/null +++ b/packages/plugins-queue-liteque/src/index.ts @@ -0,0 +1,133 @@ +import path from "node:path"; +import { + buildDBClient, + SqliteQueue as LQ, + Runner as LQRunner, + migrateDB, +} from "liteque"; + +import type { PluginProvider } from "@karakeep/shared/plugins"; +import type { + DequeuedJob, + DequeuedJobError, + EnqueueOptions, + Queue, + QueueClient, + QueueOptions, + Runner, + RunnerFuncs, + RunnerOptions, +} from "@karakeep/shared/queueing"; +import serverConfig from "@karakeep/shared/config"; + +class LitequeQueueWrapper implements Queue { + constructor( + private readonly _name: string, + private readonly lq: LQ, + ) {} + + name(): string { + return this._name; + } + + async enqueue( + payload: T, + options?: EnqueueOptions, + ): Promise { + const job = await this.lq.enqueue(payload, options); + // liteque returns a Job with numeric id + return job ? String(job.id) : undefined; + } + + async stats() { + return this.lq.stats(); + } + + async cancelAllNonRunning(): Promise { + return this.lq.cancelAllNonRunning(); + } + + // Internal accessor for runner + get _impl(): LQ { + return this.lq; + } +} + +class LitequeQueueClient implements QueueClient { + private db = buildDBClient(path.join(serverConfig.dataDir, "queue.db"), { + walEnabled: serverConfig.database.walMode, + }); + + private queues = new Map>(); + + async init(): Promise { + migrateDB(this.db); + } + + createQueue(name: string, options: QueueOptions): Queue { + if (this.queues.has(name)) { + throw new Error(`Queue ${name} already exists`); + } + const lq = new LQ(name, this.db, { + defaultJobArgs: { numRetries: options.defaultJobArgs.numRetries }, + keepFailedJobs: options.keepFailedJobs, + }); + const wrapper = new LitequeQueueWrapper(name, lq); + this.queues.set(name, wrapper); + return wrapper; + } + + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner { + const name = queue.name(); + let wrapper = this.queues.get(name); + if (!wrapper) { + throw new Error(`Queue ${name} not found`); + } + + const runner = new LQRunner( + wrapper._impl, + { + run: funcs.run, + onComplete: funcs.onComplete as + | ((job: DequeuedJob) => Promise) + | undefined, + onError: funcs.onError as + | ((job: DequeuedJobError) => Promise) + | undefined, + }, + { + pollIntervalMs: opts.pollIntervalMs ?? 1000, + timeoutSecs: opts.timeoutSecs, + concurrency: opts.concurrency, + validator: opts.validator, + }, + ); + + return { + run: () => runner.run(), + stop: () => runner.stop(), + runUntilEmpty: () => runner.runUntilEmpty(), + }; + } + + async shutdown(): Promise { + // No-op for sqlite + } +} + +export class LitequeQueueProvider implements PluginProvider { + private client: QueueClient | null = null; + + async getClient(): Promise { + if (!this.client) { + const client = new LitequeQueueClient(); + await client.init(); + this.client = client; + } + return this.client; + } +} diff --git a/packages/plugins-queue-liteque/tsconfig.json b/packages/plugins-queue-liteque/tsconfig.json new file mode 100644 index 00000000..3bfa695c --- /dev/null +++ b/packages/plugins-queue-liteque/tsconfig.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@karakeep/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + } +} + diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json index 6ba6b6d9..9c1b52a8 100644 --- a/packages/shared-server/package.json +++ b/packages/shared-server/package.json @@ -6,6 +6,7 @@ "type": "module", "dependencies": { "@karakeep/db": "workspace:^0.1.0", + "@karakeep/plugins-queue-liteque": "workspace:^0.1.0", "@karakeep/plugins-search-meilisearch": "workspace:^0.1.0", "@karakeep/shared": "workspace:^0.1.0" }, diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts index ff3c6abc..d42118c2 100644 --- a/packages/shared-server/src/index.ts +++ b/packages/shared-server/src/index.ts @@ -1,2 +1,3 @@ export { loadAllPlugins } from "./plugins"; export { QuotaService, StorageQuotaError } from "./services/quotaService"; +export * from "./queues"; diff --git a/packages/shared-server/src/plugins.ts b/packages/shared-server/src/plugins.ts index 86a0b344..b6a88462 100644 --- a/packages/shared-server/src/plugins.ts +++ b/packages/shared-server/src/plugins.ts @@ -6,6 +6,8 @@ export async function loadAllPlugins() { return; } // Load plugins here. Order of plugin loading matter. + // Queue provider(s) + await import("@karakeep/plugins-queue-liteque"); await import("@karakeep/plugins-search-meilisearch"); PluginManager.logAllPlugins(); pluginsLoaded = true; diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts new file mode 100644 index 00000000..c461c7cb --- /dev/null +++ b/packages/shared-server/src/queues.ts @@ -0,0 +1,208 @@ +import { z } from "zod"; + +import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; + +import { loadAllPlugins } from "."; + +await loadAllPlugins(); +const QUEUE_CLIENT = await getQueueClient(); + +export function runQueueDBMigrations() { + QUEUE_CLIENT.init(); +} + +// Link Crawler +export const zCrawlLinkRequestSchema = z.object({ + bookmarkId: z.string(), + runInference: z.boolean().optional(), + archiveFullPage: z.boolean().optional().default(false), +}); +export type ZCrawlLinkRequest = z.input; + +export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue( + "link_crawler_queue", + { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }, +); + +// Inference Worker +export const zOpenAIRequestSchema = z.object({ + bookmarkId: z.string(), + type: z.enum(["summarize", "tag"]).default("tag"), +}); +export type ZOpenAIRequest = z.infer; + +export const OpenAIQueue = QUEUE_CLIENT.createQueue( + "openai_queue", + { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }, +); + +// Search Indexing Worker +export const zSearchIndexingRequestSchema = z.object({ + bookmarkId: z.string(), + type: z.enum(["index", "delete"]), +}); +export type ZSearchIndexingRequest = z.infer< + typeof zSearchIndexingRequestSchema +>; +export const SearchIndexingQueue = + QUEUE_CLIENT.createQueue("searching_indexing", { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }); + +// Tidy Assets Worker +export const zTidyAssetsRequestSchema = z.object({ + cleanDanglingAssets: z.boolean().optional().default(false), + syncAssetMetadata: z.boolean().optional().default(false), +}); +export type ZTidyAssetsRequest = z.infer; +export const TidyAssetsQueue = QUEUE_CLIENT.createQueue( + "tidy_assets_queue", + { + defaultJobArgs: { + numRetries: 1, + }, + keepFailedJobs: false, + }, +); + +export async function triggerSearchReindex( + bookmarkId: string, + opts?: EnqueueOptions, +) { + await SearchIndexingQueue.enqueue( + { + bookmarkId, + type: "index", + }, + opts, + ); +} + +export const zvideoRequestSchema = z.object({ + bookmarkId: z.string(), + url: z.string(), +}); +export type ZVideoRequest = z.infer; + +export const VideoWorkerQueue = QUEUE_CLIENT.createQueue( + "video_queue", + { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }, +); + +// Feed Worker +export const zFeedRequestSchema = z.object({ + feedId: z.string(), +}); +export type ZFeedRequestSchema = z.infer; + +export const FeedQueue = QUEUE_CLIENT.createQueue( + "feed_queue", + { + defaultJobArgs: { + // One retry is enough for the feed queue given that it's periodic + numRetries: 1, + }, + keepFailedJobs: false, + }, +); + +// Preprocess Assets +export const zAssetPreprocessingRequestSchema = z.object({ + bookmarkId: z.string(), + fixMode: z.boolean().optional().default(false), +}); +export type AssetPreprocessingRequest = z.infer< + typeof zAssetPreprocessingRequestSchema +>; +export const AssetPreprocessingQueue = + QUEUE_CLIENT.createQueue( + "asset_preprocessing_queue", + { + defaultJobArgs: { + numRetries: 2, + }, + keepFailedJobs: false, + }, + ); + +// Webhook worker +export const zWebhookRequestSchema = z.object({ + bookmarkId: z.string(), + operation: z.enum(["crawled", "created", "edited", "ai tagged", "deleted"]), + userId: z.string().optional(), +}); +export type ZWebhookRequest = z.infer; +export const WebhookQueue = QUEUE_CLIENT.createQueue( + "webhook_queue", + { + defaultJobArgs: { + numRetries: 3, + }, + keepFailedJobs: false, + }, +); + +export async function triggerWebhook( + bookmarkId: string, + operation: ZWebhookRequest["operation"], + userId?: string, + opts?: EnqueueOptions, +) { + await WebhookQueue.enqueue( + { + bookmarkId, + userId, + operation, + }, + opts, + ); +} + +// RuleEngine worker +export const zRuleEngineRequestSchema = z.object({ + bookmarkId: z.string(), + events: z.array(zRuleEngineEventSchema), +}); +export type ZRuleEngineRequest = z.infer; +export const RuleEngineQueue = QUEUE_CLIENT.createQueue( + "rule_engine_queue", + { + defaultJobArgs: { + numRetries: 1, + }, + keepFailedJobs: false, + }, +); + +export async function triggerRuleEngineOnEvent( + bookmarkId: string, + events: z.infer[], + opts?: EnqueueOptions, +) { + await RuleEngineQueue.enqueue( + { + events, + bookmarkId, + }, + opts, + ); +} diff --git a/packages/shared/index.ts b/packages/shared/index.ts index e449443d..e69de29b 100644 --- a/packages/shared/index.ts +++ b/packages/shared/index.ts @@ -1 +0,0 @@ -export * as Queues from "./queues"; diff --git a/packages/shared/package.json b/packages/shared/package.json index 29879868..93739354 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -9,7 +9,6 @@ "glob": "^11.0.0", "html-to-text": "^9.0.5", "js-tiktoken": "^1.0.20", - "liteque": "^0.6.0", "nodemailer": "^7.0.4", "ollama": "^0.5.14", "openai": "^4.86.1", diff --git a/packages/shared/plugins.ts b/packages/shared/plugins.ts index 2ce5826a..2aa7df4a 100644 --- a/packages/shared/plugins.ts +++ b/packages/shared/plugins.ts @@ -1,14 +1,17 @@ // Implementation inspired from Outline +import type { QueueClient } from "./queueing"; import logger from "./logger"; import { SearchIndexClient } from "./search"; export enum PluginType { Search = "search", + Queue = "queue", } interface PluginTypeMap { [PluginType.Search]: SearchIndexClient; + [PluginType.Queue]: QueueClient; } export interface TPlugin { @@ -21,37 +24,38 @@ export interface PluginProvider { getClient(): Promise; } +// Preserve the key-dependent value type: for K, store TPlugin[] +type ProviderMap = { [K in PluginType]: TPlugin[] }; + export class PluginManager { - private static providers = new Map[]>(); + private static providers: ProviderMap = { + [PluginType.Search]: [], + [PluginType.Queue]: [], + }; static register(plugin: TPlugin): void { - const p = PluginManager.providers.get(plugin.type); - if (!p) { - PluginManager.providers.set(plugin.type, [plugin]); - return; - } - p.push(plugin); + PluginManager.providers[plugin.type].push(plugin); } static async getClient( type: T, ): Promise { - const provider = PluginManager.providers.get(type); - if (!provider) { + const providers: TPlugin[] = PluginManager.providers[type]; + if (providers.length === 0) { return null; } - return await provider[provider.length - 1].provider.getClient(); + return await providers[providers.length - 1]!.provider.getClient(); } static isRegistered(type: T): boolean { - return !!PluginManager.providers.get(type); + return PluginManager.providers[type].length > 0; } static logAllPlugins() { logger.info("Plugins (Last one wins):"); for (const type of Object.values(PluginType)) { logger.info(` ${type}:`); - const plugins = PluginManager.providers.get(type); + const plugins = PluginManager.providers[type]; if (!plugins) { logger.info(" - None"); continue; diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts new file mode 100644 index 00000000..dfe3b31a --- /dev/null +++ b/packages/shared/queueing.ts @@ -0,0 +1,84 @@ +import { ZodType } from "zod"; + +import { PluginManager, PluginType } from "./plugins"; + +export interface EnqueueOptions { + numRetries?: number; + idempotencyKey?: string; + priority?: number; + delayMs?: number; +} + +export interface QueueOptions { + defaultJobArgs: { + numRetries: number; + }; + keepFailedJobs: boolean; +} + +export interface DequeuedJob { + id: string; + data: T; + priority: number; + runNumber: number; + abortSignal: AbortSignal; +} + +export interface DequeuedJobError { + id: string; + data?: T; + priority: number; + error: Error; + runNumber: number; + numRetriesLeft: number; +} + +export interface RunnerFuncs { + run: (job: DequeuedJob) => Promise; + onComplete?: (job: DequeuedJob) => Promise; + onError?: (job: DequeuedJobError) => Promise; +} + +export interface RunnerOptions { + pollIntervalMs?: number; + timeoutSecs: number; + concurrency: number; + validator?: ZodType; +} + +export interface Queue { + name(): string; + enqueue(payload: T, options?: EnqueueOptions): Promise; + stats(): Promise<{ + pending: number; + pending_retry: number; + running: number; + failed: number; + }>; + cancelAllNonRunning?(): Promise; +} + +export interface Runner<_T> { + run(): Promise; + stop(): void; + runUntilEmpty?(): Promise; +} + +export interface QueueClient { + init(): Promise; + createQueue(name: string, options: QueueOptions): Queue; + createRunner( + queue: Queue, + funcs: RunnerFuncs, + opts: RunnerOptions, + ): Runner; + shutdown?(): Promise; +} + +export async function getQueueClient(): Promise { + const client = await PluginManager.getClient(PluginType.Queue); + if (!client) { + throw new Error("Failed to get queue client"); + } + return client; +} diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts deleted file mode 100644 index cf8920e1..00000000 --- a/packages/shared/queues.ts +++ /dev/null @@ -1,222 +0,0 @@ -import path from "node:path"; -import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque"; -import { z } from "zod"; - -import serverConfig from "./config"; -import { zRuleEngineEventSchema } from "./types/rules"; - -const QUEUE_DB_PATH = path.join(serverConfig.dataDir, "queue.db"); - -const queueDB = buildDBClient(QUEUE_DB_PATH, { - walEnabled: serverConfig.database.walMode, -}); - -export function runQueueDBMigrations() { - migrateDB(queueDB); -} - -// Link Crawler -export const zCrawlLinkRequestSchema = z.object({ - bookmarkId: z.string(), - runInference: z.boolean().optional(), - archiveFullPage: z.boolean().optional().default(false), -}); -export type ZCrawlLinkRequest = z.input; - -export const LinkCrawlerQueue = new SqliteQueue( - "link_crawler_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 5, - }, - keepFailedJobs: false, - }, -); - -// Inference Worker -export const zOpenAIRequestSchema = z.object({ - bookmarkId: z.string(), - type: z.enum(["summarize", "tag"]).default("tag"), -}); -export type ZOpenAIRequest = z.infer; - -export const OpenAIQueue = new SqliteQueue( - "openai_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, - }, -); - -// Search Indexing Worker -export const zSearchIndexingRequestSchema = z.object({ - bookmarkId: z.string(), - type: z.enum(["index", "delete"]), -}); -export type ZSearchIndexingRequest = z.infer< - typeof zSearchIndexingRequestSchema ->; -export const SearchIndexingQueue = new SqliteQueue( - "searching_indexing", - queueDB, - { - defaultJobArgs: { - numRetries: 5, - }, - keepFailedJobs: false, - }, -); - -// Tidy Assets Worker -export const zTidyAssetsRequestSchema = z.object({ - cleanDanglingAssets: z.boolean().optional().default(false), - syncAssetMetadata: z.boolean().optional().default(false), -}); -export type ZTidyAssetsRequest = z.infer; -export const TidyAssetsQueue = new SqliteQueue( - "tidy_assets_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 1, - }, - keepFailedJobs: false, - }, -); - -export async function triggerSearchReindex( - bookmarkId: string, - opts?: EnqueueOptions, -) { - await SearchIndexingQueue.enqueue( - { - bookmarkId, - type: "index", - }, - opts, - ); -} - -export const zvideoRequestSchema = z.object({ - bookmarkId: z.string(), - url: z.string(), -}); -export type ZVideoRequest = z.infer; - -export const VideoWorkerQueue = new SqliteQueue( - "video_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 5, - }, - keepFailedJobs: false, - }, -); - -// Feed Worker -export const zFeedRequestSchema = z.object({ - feedId: z.string(), -}); -export type ZFeedRequestSchema = z.infer; - -export const FeedQueue = new SqliteQueue( - "feed_queue", - queueDB, - { - defaultJobArgs: { - // One retry is enough for the feed queue given that it's periodic - numRetries: 1, - }, - keepFailedJobs: false, - }, -); - -// Preprocess Assets -export const zAssetPreprocessingRequestSchema = z.object({ - bookmarkId: z.string(), - fixMode: z.boolean().optional().default(false), -}); -export type AssetPreprocessingRequest = z.infer< - typeof zAssetPreprocessingRequestSchema ->; -export const AssetPreprocessingQueue = - new SqliteQueue( - "asset_preprocessing_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, - }, - ); - -// Webhook worker -export const zWebhookRequestSchema = z.object({ - bookmarkId: z.string(), - operation: z.enum(["crawled", "created", "edited", "ai tagged", "deleted"]), - userId: z.string().optional(), -}); -export type ZWebhookRequest = z.infer; -export const WebhookQueue = new SqliteQueue( - "webhook_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, - }, -); - -export async function triggerWebhook( - bookmarkId: string, - operation: ZWebhookRequest["operation"], - userId?: string, - opts?: EnqueueOptions, -) { - await WebhookQueue.enqueue( - { - bookmarkId, - userId, - operation, - }, - opts, - ); -} - -// RuleEngine worker -export const zRuleEngineRequestSchema = z.object({ - bookmarkId: z.string(), - events: z.array(zRuleEngineEventSchema), -}); -export type ZRuleEngineRequest = z.infer; -export const RuleEngineQueue = new SqliteQueue( - "rule_engine_queue", - queueDB, - { - defaultJobArgs: { - numRetries: 1, - }, - keepFailedJobs: false, - }, -); - -export async function triggerRuleEngineOnEvent( - bookmarkId: string, - events: z.infer[], - opts?: EnqueueOptions, -) { - await RuleEngineQueue.enqueue( - { - events, - bookmarkId, - }, - opts, - ); -} diff --git a/packages/trpc/lib/__tests__/ruleEngine.test.ts b/packages/trpc/lib/__tests__/ruleEngine.test.ts index cbb4b978..a108ede7 100644 --- a/packages/trpc/lib/__tests__/ruleEngine.test.ts +++ b/packages/trpc/lib/__tests__/ruleEngine.test.ts @@ -15,7 +15,7 @@ import { tagsOnBookmarks, users, } from "@karakeep/db/schema"; -import { LinkCrawlerQueue } from "@karakeep/shared/queues"; +import { LinkCrawlerQueue } from "@karakeep/shared-server"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import { RuleEngineAction, @@ -29,7 +29,7 @@ import { TestDB } from "../../testUtils"; import { RuleEngine } from "../ruleEngine"; // Mock the queue -vi.mock("@karakeep/shared/queues", () => ({ +vi.mock("@karakeep/shared-server", () => ({ LinkCrawlerQueue: { enqueue: vi.fn(), }, diff --git a/packages/trpc/lib/ruleEngine.ts b/packages/trpc/lib/ruleEngine.ts index 0bef8cdc..2d5deae6 100644 --- a/packages/trpc/lib/ruleEngine.ts +++ b/packages/trpc/lib/ruleEngine.ts @@ -2,7 +2,7 @@ import deepEql from "deep-equal"; import { and, eq } from "drizzle-orm"; import { bookmarks, tagsOnBookmarks } from "@karakeep/db/schema"; -import { LinkCrawlerQueue } from "@karakeep/shared/queues"; +import { LinkCrawlerQueue } from "@karakeep/shared-server"; import { RuleEngineAction, RuleEngineCondition, diff --git a/packages/trpc/models/lists.ts b/packages/trpc/models/lists.ts index 39d78ac1..c0e17bfc 100644 --- a/packages/trpc/models/lists.ts +++ b/packages/trpc/models/lists.ts @@ -6,7 +6,7 @@ import { z } from "zod"; import { SqliteError } from "@karakeep/db"; import { bookmarkLists, bookmarksInLists } from "@karakeep/db/schema"; -import { triggerRuleEngineOnEvent } from "@karakeep/shared/queues"; +import { triggerRuleEngineOnEvent } from "@karakeep/shared-server"; import { parseSearchQuery } from "@karakeep/shared/searchQueryParser"; import { ZSortOrder } from "@karakeep/shared/types/bookmarks"; import { diff --git a/packages/trpc/models/tags.ts b/packages/trpc/models/tags.ts index 79cd855b..a91dcbdf 100644 --- a/packages/trpc/models/tags.ts +++ b/packages/trpc/models/tags.ts @@ -5,7 +5,7 @@ import { z } from "zod"; import type { ZAttachedByEnum } from "@karakeep/shared/types/tags"; import { SqliteError } from "@karakeep/db"; import { bookmarkTags, tagsOnBookmarks } from "@karakeep/db/schema"; -import { triggerSearchReindex } from "@karakeep/shared/queues"; +import { triggerSearchReindex } from "@karakeep/shared-server"; import { zCreateTagRequestSchema, zGetTagResponseSchema, diff --git a/packages/trpc/package.json b/packages/trpc/package.json index c4e16675..d1896a0b 100644 --- a/packages/trpc/package.json +++ b/packages/trpc/package.json @@ -20,7 +20,6 @@ "bcryptjs": "^2.4.3", "deep-equal": "^2.2.3", "drizzle-orm": "^0.44.2", - "liteque": "^0.6.0", "nodemailer": "^7.0.4", "prom-client": "^15.1.3", "stripe": "^18.3.0", diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts index e005c3dd..25425eaf 100644 --- a/packages/trpc/routers/admin.ts +++ b/packages/trpc/routers/admin.ts @@ -13,7 +13,7 @@ import { triggerSearchReindex, VideoWorkerQueue, WebhookQueue, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; import { getSearchClient } from "@karakeep/shared/search"; import { resetPasswordSchema, diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index efd295f7..3399bf19 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -1,6 +1,5 @@ import { experimental_trpcMiddleware, TRPCError } from "@trpc/server"; import { and, eq, gt, inArray, lt, or } from "drizzle-orm"; -import { EnqueueOptions } from "liteque"; import invariant from "tiny-invariant"; import { z } from "zod"; @@ -21,23 +20,24 @@ import { customPrompts, tagsOnBookmarks, } from "@karakeep/db/schema"; -import { QuotaService } from "@karakeep/shared-server"; -import { - deleteAsset, - SUPPORTED_BOOKMARK_ASSET_TYPES, -} from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import { InferenceClientFactory } from "@karakeep/shared/inference"; -import { buildSummaryPrompt } from "@karakeep/shared/prompts"; import { AssetPreprocessingQueue, LinkCrawlerQueue, OpenAIQueue, + QuotaService, SearchIndexingQueue, triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; +import { + deleteAsset, + SUPPORTED_BOOKMARK_ASSET_TYPES, +} from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClientFactory } from "@karakeep/shared/inference"; +import { buildSummaryPrompt } from "@karakeep/shared/prompts"; +import { EnqueueOptions } from "@karakeep/shared/queueing"; import { FilterQuery, getSearchClient } from "@karakeep/shared/search"; import { parseSearchQuery } from "@karakeep/shared/searchQueryParser"; import { diff --git a/packages/trpc/routers/feeds.ts b/packages/trpc/routers/feeds.ts index 27eefdf1..57c88084 100644 --- a/packages/trpc/routers/feeds.ts +++ b/packages/trpc/routers/feeds.ts @@ -1,6 +1,6 @@ import { z } from "zod"; -import { FeedQueue } from "@karakeep/shared/queues"; +import { FeedQueue } from "@karakeep/shared-server"; import { zFeedSchema, zNewFeedSchema, diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts index 9aef42ef..c6d5c94c 100644 --- a/packages/trpc/stats.ts +++ b/packages/trpc/stats.ts @@ -13,7 +13,7 @@ import { TidyAssetsQueue, VideoWorkerQueue, WebhookQueue, -} from "@karakeep/shared/queues"; +} from "@karakeep/shared-server"; // Queue metrics const queuePendingJobsGauge = new Gauge({ diff --git a/packages/trpc/testUtils.ts b/packages/trpc/testUtils.ts index 1cc4e727..b8fe3c30 100644 --- a/packages/trpc/testUtils.ts +++ b/packages/trpc/testUtils.ts @@ -77,20 +77,25 @@ export async function buildTestContext( export function defaultBeforeEach(seedDB = true) { return async (context: object) => { - vi.mock("@karakeep/shared/queues", () => ({ - LinkCrawlerQueue: { - enqueue: vi.fn(), - }, - OpenAIQueue: { - enqueue: vi.fn(), - }, - SearchIndexingQueue: { - enqueue: vi.fn(), - }, - triggerRuleEngineOnEvent: vi.fn(), - triggerSearchReindex: vi.fn(), - triggerWebhook: vi.fn(), - })); + vi.mock("@karakeep/shared-server", async (original) => { + const mod = + (await original()) as typeof import("@karakeep/shared-server"); + return { + ...mod, + LinkCrawlerQueue: { + enqueue: vi.fn(), + }, + OpenAIQueue: { + enqueue: vi.fn(), + }, + SearchIndexingQueue: { + enqueue: vi.fn(), + }, + triggerRuleEngineOnEvent: vi.fn(), + triggerSearchReindex: vi.fn(), + triggerWebhook: vi.fn(), + }; + }); Object.assign(context, await buildTestContext(seedDB)); }; } diff --git a/packages/trpc/vitest.config.ts b/packages/trpc/vitest.config.ts index 5af4ad16..41fd70c4 100644 --- a/packages/trpc/vitest.config.ts +++ b/packages/trpc/vitest.config.ts @@ -10,9 +10,5 @@ export default defineConfig({ alias: { "@/*": "./*", }, - deps: { - // TODO: this need to be fixed - inline: ["liteque"], - }, }, }); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 55040814..dc31796c 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -832,9 +832,6 @@ importers: jsdom: specifier: ^24.0.0 version: 24.1.3 - liteque: - specifier: ^0.6.0 - version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) metascraper: specifier: ^5.46.18 version: 5.47.1 @@ -1139,6 +1136,28 @@ importers: specifier: ^4.8.1 version: 4.20.3 + packages/plugins-queue-liteque: + dependencies: + '@karakeep/shared': + specifier: workspace:* + version: link:../shared + liteque: + specifier: ^0.6.0 + version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) + devDependencies: + '@karakeep/prettier-config': + specifier: workspace:^0.1.0 + version: link:../../tooling/prettier + '@karakeep/tsconfig': + specifier: workspace:^0.1.0 + version: link:../../tooling/typescript + vite-tsconfig-paths: + specifier: ^4.3.1 + version: 4.3.2(typescript@5.8.3)(vite@7.0.6(@types/node@22.15.30)(jiti@2.4.2)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0)) + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.15.30)(happy-dom@17.4.9)(jiti@2.4.2)(jsdom@26.1.0)(lightningcss@1.30.1)(sass@1.89.1)(terser@5.41.0)(tsx@4.20.3)(yaml@2.8.0) + packages/plugins-search-meilisearch: dependencies: '@karakeep/shared': @@ -1203,9 +1222,6 @@ importers: js-tiktoken: specifier: ^1.0.20 version: 1.0.20 - liteque: - specifier: ^0.6.0 - version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) nodemailer: specifier: ^7.0.4 version: 7.0.4 @@ -1283,6 +1299,9 @@ importers: '@karakeep/db': specifier: workspace:^0.1.0 version: link:../db + '@karakeep/plugins-queue-liteque': + specifier: workspace:^0.1.0 + version: link:../plugins-queue-liteque '@karakeep/plugins-search-meilisearch': specifier: workspace:^0.1.0 version: link:../plugins-search-meilisearch @@ -1320,9 +1339,6 @@ importers: drizzle-orm: specifier: ^0.44.2 version: 0.44.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@11.3.0)(gel@2.1.0)(kysely@0.28.5) - liteque: - specifier: ^0.6.0 - version: 0.6.0(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.11)(better-sqlite3@11.3.0)(kysely@0.28.5)(react@19.1.0) nodemailer: specifier: ^7.0.4 version: 7.0.4 -- cgit v1.2.3-70-g09d2