aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-14 18:16:40 +0000
committerMohamed Bassem <me@mbassem.com>2025-09-14 18:16:57 +0000
commit8d32055485858210252096483bb20533dc8bdf60 (patch)
treece8a1373411d1ce40aa0dbe6c37e707f0dbf4c98 /apps/workers
parent6ba61b46154e076fca47d3841b158105dbeeef80 (diff)
downloadkarakeep-8d32055485858210252096483bb20533dc8bdf60.tar.zst
refactor: Move callsites to liteque to be behind a plugin
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts3
-rw-r--r--apps/workers/package.json1
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts70
-rw-r--r--apps/workers/workers/crawlerWorker.ts28
-rw-r--r--apps/workers/workers/feedWorker.ts11
-rw-r--r--apps/workers/workers/inference/inferenceWorker.ts10
-rw-r--r--apps/workers/workers/inference/summarize.ts4
-rw-r--r--apps/workers/workers/inference/tagging.ts14
-rw-r--r--apps/workers/workers/ruleEngineWorker.ts14
-rw-r--r--apps/workers/workers/searchWorker.ts59
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts12
-rw-r--r--apps/workers/workers/videoWorker.ts19
-rw-r--r--apps/workers/workers/webhookWorker.ts12
13 files changed, 134 insertions, 123 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index cd860fc0..578ff6c8 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -2,10 +2,9 @@ import "dotenv/config";
import { buildServer } from "server";
-import { loadAllPlugins } from "@karakeep/shared-server";
+import { loadAllPlugins, runQueueDBMigrations } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
-import { runQueueDBMigrations } from "@karakeep/shared/queues";
import { shutdownPromise } from "./exit";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 6de801dd..030250e0 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -24,7 +24,6 @@
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.6",
"jsdom": "^24.0.0",
- "liteque": "^0.6.0",
"metascraper": "^5.46.18",
"metascraper-amazon": "^5.46.18",
"metascraper-author": "5.46.18",
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index d2df1724..2dcec59b 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -1,12 +1,11 @@
import os from "os";
import { eq } from "drizzle-orm";
-import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
-import type { AssetPreprocessingRequest } from "@karakeep/shared/queues";
+import type { AssetPreprocessingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import {
assets,
@@ -14,44 +13,53 @@ import {
bookmarkAssets,
bookmarks,
} from "@karakeep/db/schema";
-import { QuotaService, StorageQuotaError } from "@karakeep/shared-server";
-import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
AssetPreprocessingQueue,
OpenAIQueue,
+ QuotaService,
+ StorageQuotaError,
triggerSearchReindex,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import {
+ DequeuedJob,
+ EnqueueOptions,
+ getQueueClient,
+} from "@karakeep/shared/queueing";
export class AssetPreprocessingWorker {
- static build() {
+ static async build() {
logger.info("Starting asset preprocessing worker ...");
- const worker = new Runner<AssetPreprocessingRequest>(
- AssetPreprocessingQueue,
- {
- run: run,
- onComplete: async (job) => {
- workerStatsCounter.labels("assetPreprocessing", "completed").inc();
- const jobId = job.id;
- logger.info(`[assetPreprocessing][${jobId}] Completed successfully`);
- return Promise.resolve();
+ const worker =
+ (await getQueueClient())!.createRunner<AssetPreprocessingRequest>(
+ AssetPreprocessingQueue,
+ {
+ run: run,
+ onComplete: async (job) => {
+ workerStatsCounter.labels("assetPreprocessing", "completed").inc();
+ const jobId = job.id;
+ logger.info(
+ `[assetPreprocessing][${jobId}] Completed successfully`,
+ );
+ return Promise.resolve();
+ },
+ onError: async (job) => {
+ workerStatsCounter.labels("assetPreProcessing", "failed").inc();
+ const jobId = job.id;
+ logger.error(
+ `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
},
- onError: async (job) => {
- workerStatsCounter.labels("assetPreProcessing", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
+ {
+ concurrency: serverConfig.assetPreprocessing.numWorkers,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
- },
- {
- concurrency: serverConfig.assetPreprocessing.numWorkers,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
+ );
return worker;
}
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 79d8e06a..baea1346 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -10,7 +10,6 @@ import { eq } from "drizzle-orm";
import { execa } from "execa";
import { exitAbortController } from "exit";
import { JSDOM, VirtualConsole } from "jsdom";
-import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import metascraper from "metascraper";
import metascraperAmazon from "metascraper-amazon";
import metascraperAuthor from "metascraper-author";
@@ -30,7 +29,7 @@ import { fetchWithProxy } from "utils";
import { getBookmarkDetails, updateAsset } from "workerUtils";
import { z } from "zod";
-import type { ZCrawlLinkRequest } from "@karakeep/shared/queues";
+import type { ZCrawlLinkRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import {
assets,
@@ -40,7 +39,16 @@ import {
bookmarks,
users,
} from "@karakeep/db/schema";
-import { QuotaService } from "@karakeep/shared-server";
+import {
+ AssetPreprocessingQueue,
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ QuotaService,
+ triggerSearchReindex,
+ triggerWebhook,
+ VideoWorkerQueue,
+ zCrawlLinkRequestSchema,
+} from "@karakeep/shared-server";
import {
ASSET_TYPES,
getAssetSize,
@@ -55,14 +63,10 @@ import {
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import {
- AssetPreprocessingQueue,
- LinkCrawlerQueue,
- OpenAIQueue,
- triggerSearchReindex,
- triggerWebhook,
- VideoWorkerQueue,
- zCrawlLinkRequestSchema,
-} from "@karakeep/shared/queues";
+ DequeuedJob,
+ EnqueueOptions,
+ getQueueClient,
+} from "@karakeep/shared/queueing";
import { tryCatch } from "@karakeep/shared/tryCatch";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -247,7 +251,7 @@ export class CrawlerWorker {
}
logger.info("Starting crawler worker ...");
- const worker = new Runner<ZCrawlLinkRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZCrawlLinkRequest>(
LinkCrawlerQueue,
{
run: runCrawler,
diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 2ece4890..048d9696 100644
--- a/apps/workers/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
@@ -1,5 +1,4 @@
import { and, eq, inArray } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import cron from "node-cron";
import Parser from "rss-parser";
@@ -7,12 +6,12 @@ import { buildImpersonatingTRPCClient } from "trpc";
import { fetchWithProxy } from "utils";
import { z } from "zod";
-import type { ZFeedRequestSchema } from "@karakeep/shared/queues";
+import type { ZFeedRequestSchema } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema";
-import { QuotaService } from "@karakeep/shared-server";
+import { FeedQueue, QuotaService } from "@karakeep/shared-server";
import logger from "@karakeep/shared/logger";
-import { FeedQueue } from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
export const FeedRefreshingWorker = cron.schedule(
@@ -46,9 +45,9 @@ export const FeedRefreshingWorker = cron.schedule(
);
export class FeedWorker {
- static build() {
+ static async build() {
logger.info("Starting feed worker ...");
- const worker = new Runner<ZFeedRequestSchema>(
+ const worker = (await getQueueClient())!.createRunner<ZFeedRequestSchema>(
FeedQueue,
{
run: run,
diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts
index 32de3806..065462b3 100644
--- a/apps/workers/workers/inference/inferenceWorker.ts
+++ b/apps/workers/workers/inference/inferenceWorker.ts
@@ -1,14 +1,14 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
-import type { ZOpenAIRequest } from "@karakeep/shared/queues";
+import type { ZOpenAIRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
+import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import { InferenceClientFactory } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
-import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { runSummarization } from "./summarize";
import { runTagging } from "./tagging";
@@ -37,9 +37,9 @@ async function attemptMarkStatus(
}
export class OpenAiWorker {
- static build() {
+ static async build() {
logger.info("Starting inference worker ...");
- const worker = new Runner<ZOpenAIRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZOpenAIRequest>(
OpenAIQueue,
{
run: runOpenAI,
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 02a7f4e0..0b25470e 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -1,13 +1,13 @@
import { and, eq } from "drizzle-orm";
-import { DequeuedJob } from "liteque";
import { db } from "@karakeep/db";
import { bookmarks, customPrompts } from "@karakeep/db/schema";
+import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import { InferenceClient } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
import { buildSummaryPrompt } from "@karakeep/shared/prompts";
-import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues";
+import { DequeuedJob } from "@karakeep/shared/queueing";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 6dd146af..0d245644 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,13 +1,12 @@
import { and, Column, eq, inArray, sql } from "drizzle-orm";
-import { DequeuedJob, EnqueueOptions } from "liteque";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
+import type { ZOpenAIRequest } from "@karakeep/shared-server";
import type {
InferenceClient,
InferenceResponse,
} from "@karakeep/shared/inference";
-import type { ZOpenAIRequest } from "@karakeep/shared/queues";
import { db } from "@karakeep/db";
import {
bookmarks,
@@ -15,15 +14,16 @@ import {
customPrompts,
tagsOnBookmarks,
} from "@karakeep/db/schema";
-import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
-import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
import {
triggerRuleEngineOnEvent,
triggerSearchReindex,
triggerWebhook,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import { ASSET_TYPES, readAsset } from "@karakeep/shared/assetdb";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
+import { DequeuedJob, EnqueueOptions } from "@karakeep/shared/queueing";
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
const openAIResponseSchema = z.object({
diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts
index 2a4fbb1a..37c7f595 100644
--- a/apps/workers/workers/ruleEngineWorker.ts
+++ b/apps/workers/workers/ruleEngineWorker.ts
@@ -1,23 +1,23 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { buildImpersonatingAuthedContext } from "trpc";
-import type { ZRuleEngineRequest } from "@karakeep/shared/queues";
+import type { ZRuleEngineRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
RuleEngineQueue,
zRuleEngineRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine";
export class RuleEngineWorker {
- static build() {
+ static async build() {
logger.info("Starting rule engine worker ...");
- const worker = new Runner<ZRuleEngineRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZRuleEngineRequest>(
RuleEngineQueue,
{
run: runRuleEngine,
diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index 1433ef3c..8f043c1d 100644
--- a/apps/workers/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
@@ -1,16 +1,16 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
-import type { ZSearchIndexingRequest } from "@karakeep/shared/queues";
+import type { ZSearchIndexingRequest } from "@karakeep/shared-server";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
SearchIndexingQueue,
zSearchIndexingRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import {
BookmarkSearchDocument,
getSearchClient,
@@ -19,33 +19,34 @@ import {
import { Bookmark } from "@karakeep/trpc/models/bookmarks";
export class SearchIndexingWorker {
- static build() {
+ static async build() {
logger.info("Starting search indexing worker ...");
- const worker = new Runner<ZSearchIndexingRequest>(
- SearchIndexingQueue,
- {
- run: runSearchIndexing,
- onComplete: (job) => {
- workerStatsCounter.labels("search", "completed").inc();
- const jobId = job.id;
- logger.info(`[search][${jobId}] Completed successfully`);
- return Promise.resolve();
+ const worker =
+ (await getQueueClient())!.createRunner<ZSearchIndexingRequest>(
+ SearchIndexingQueue,
+ {
+ run: runSearchIndexing,
+ onComplete: (job) => {
+ workerStatsCounter.labels("search", "completed").inc();
+ const jobId = job.id;
+ logger.info(`[search][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ workerStatsCounter.labels("search", "failed").inc();
+ const jobId = job.id;
+ logger.error(
+ `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
},
- onError: (job) => {
- workerStatsCounter.labels("search", "failed").inc();
- const jobId = job.id;
- logger.error(
- `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
+ {
+ concurrency: serverConfig.search.numWorkers,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
- },
- {
- concurrency: serverConfig.search.numWorkers,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
+ );
return worker;
}
diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
index cf3e33b6..b5b95185 100644
--- a/apps/workers/workers/tidyAssetsWorker.ts
+++ b/apps/workers/workers/tidyAssetsWorker.ts
@@ -1,21 +1,21 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { assets } from "@karakeep/db/schema";
-import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
-import logger from "@karakeep/shared/logger";
import {
TidyAssetsQueue,
ZTidyAssetsRequest,
zTidyAssetsRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
export class TidyAssetsWorker {
- static build() {
+ static async build() {
logger.info("Starting tidy assets worker ...");
- const worker = new Runner<ZTidyAssetsRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZTidyAssetsRequest>(
TidyAssetsQueue,
{
run: runTidyAssets,
diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index 68be0126..a41eb069 100644
--- a/apps/workers/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -2,12 +2,17 @@ import fs from "fs";
import * as os from "os";
import path from "path";
import { execa } from "execa";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
-import { QuotaService, StorageQuotaError } from "@karakeep/shared-server";
+import {
+ QuotaService,
+ StorageQuotaError,
+ VideoWorkerQueue,
+ ZVideoRequest,
+ zvideoRequestSchema,
+} from "@karakeep/shared-server";
import {
ASSET_TYPES,
newAssetId,
@@ -16,21 +21,17 @@ import {
} from "@karakeep/shared/assetdb";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
-import {
- VideoWorkerQueue,
- ZVideoRequest,
- zvideoRequestSchema,
-} from "@karakeep/shared/queues";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
import { getBookmarkDetails, updateAsset } from "../workerUtils";
const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads");
export class VideoWorker {
- static build() {
+ static async build() {
logger.info("Starting video worker ...");
- return new Runner<ZVideoRequest>(
+ return (await getQueueClient())!.createRunner<ZVideoRequest>(
VideoWorkerQueue,
{
run: runWorker,
diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts
index 96070e22..2bbef160 100644
--- a/apps/workers/workers/webhookWorker.ts
+++ b/apps/workers/workers/webhookWorker.ts
@@ -1,22 +1,22 @@
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
import { workerStatsCounter } from "metrics";
import fetch from "node-fetch";
import { db } from "@karakeep/db";
import { bookmarks, webhooksTable } from "@karakeep/db/schema";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
import {
WebhookQueue,
ZWebhookRequest,
zWebhookRequestSchema,
-} from "@karakeep/shared/queues";
+} from "@karakeep/shared-server";
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
export class WebhookWorker {
- static build() {
+ static async build() {
logger.info("Starting webhook worker ...");
- const worker = new Runner<ZWebhookRequest>(
+ const worker = (await getQueueClient())!.createRunner<ZWebhookRequest>(
WebhookQueue,
{
run: runWebhook,