From 5576361a1afa280abb256cafe17b7a140ee42adf Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 5 Jul 2025 19:57:01 +0000 Subject: feat(workers): Allow custmoizing max parallelism for a bunch of workers. Fixes #724 --- apps/web/app/dashboard/layout.tsx | 2 +- apps/workers/workers/assetPreprocessingWorker.ts | 2 +- apps/workers/workers/inference/inferenceWorker.ts | 2 +- apps/workers/workers/ruleEngineWorker.ts | 3 ++- apps/workers/workers/searchWorker.ts | 3 ++- apps/workers/workers/webhookWorker.ts | 2 +- docs/docs/03-configuration.md | 12 ++++++++++ packages/shared/config.ts | 28 ++++++++++++++++++----- packages/shared/search.ts | 6 ++--- 9 files changed, 45 insertions(+), 15 deletions(-) diff --git a/apps/web/app/dashboard/layout.tsx b/apps/web/app/dashboard/layout.tsx index c4a53e4b..670286ea 100644 --- a/apps/web/app/dashboard/layout.tsx +++ b/apps/web/app/dashboard/layout.tsx @@ -43,7 +43,7 @@ export default async function Dashboard({ icon: , path: "/dashboard/bookmarks", }, - serverConfig.meilisearch + serverConfig.search.meilisearch ? [ { name: t("common.search"), diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 0c0b7aec..d43163ac 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -43,7 +43,7 @@ export class AssetPreprocessingWorker { }, }, { - concurrency: 1, + concurrency: serverConfig.assetPreprocessing.numWorkers, pollIntervalMs: 1000, timeoutSecs: 30, }, diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts index f7492c8b..0dba6f58 100644 --- a/apps/workers/workers/inference/inferenceWorker.ts +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -58,7 +58,7 @@ export class OpenAiWorker { }, }, { - concurrency: 1, + concurrency: serverConfig.inference.numWorkers, pollIntervalMs: 1000, timeoutSecs: serverConfig.inference.jobTimeoutSec, }, diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 427cc383..39f0a523 100644 --- a/apps/workers/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -5,6 +5,7 @@ import { buildImpersonatingAuthedContext } from "trpc"; import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { RuleEngineQueue, @@ -33,7 +34,7 @@ export class RuleEngineWorker { }, }, { - concurrency: 1, + concurrency: serverConfig.ruleEngine.numWorkers, pollIntervalMs: 1000, timeoutSecs: 10, validator: zRuleEngineRequestSchema, diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index e7b827a9..13243152 100644 --- a/apps/workers/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts @@ -4,6 +4,7 @@ import { DequeuedJob, Runner } from "liteque"; import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; import { db } from "@karakeep/db"; import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { SearchIndexingQueue, @@ -32,7 +33,7 @@ export class SearchIndexingWorker { }, }, { - concurrency: 1, + concurrency: serverConfig.search.numWorkers, pollIntervalMs: 1000, timeoutSecs: 30, }, diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index f42266dd..504f7f9b 100644 --- a/apps/workers/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts @@ -33,7 +33,7 @@ export class WebhookWorker { }, }, { - concurrency: 1, + concurrency: serverConfig.webhook.numWorkers, pollIntervalMs: 1000, timeoutSecs: serverConfig.webhook.timeoutSec * diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md index 156632d7..d8981843 100644 --- a/docs/docs/03-configuration.md +++ b/docs/docs/03-configuration.md @@ -117,6 +117,18 @@ Either `OPENAI_API_KEY` or `OLLAMA_BASE_URL` need to be set for automatic taggin | CRAWLER_ENABLE_ADBLOCKER | No | true | Whether to enable an adblocker in the crawler or not. If you're facing troubles downloading the adblocking lists on worker startup, you can disable this. | | CRAWLER_YTDLP_ARGS | No | [] | Include additional yt-dlp arguments to be passed at crawl time separated by %%: https://github.com/yt-dlp/yt-dlp?tab=readme-ov-file#general-options | +## Worker Concurrency Configs + +These settings control the number of concurrent workers for different background processing tasks. Increasing these values can improve throughput but will consume more system resources. + +| Name | Required | Default | Description | +| ------------------------------- | -------- | ------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| INFERENCE_NUM_WORKERS | No | 1 | Number of concurrent workers for AI inference tasks (tagging and summarization). Increase this if you have multiple AI inference requests and want to process them in parallel. | +| SEARCH_NUM_WORKERS | No | 1 | Number of concurrent workers for search indexing tasks. Increase this if you have a high volume of content being indexed for search. | +| WEBHOOK_NUM_WORKERS | No | 1 | Number of concurrent workers for webhook delivery. Increase this if you have multiple webhook endpoints or high webhook traffic. | +| ASSET_PREPROCESSING_NUM_WORKERS | No | 1 | Number of concurrent workers for asset preprocessing tasks (image processing, OCR, etc.). Increase this if you have many images or documents that need processing. | +| RULE_ENGINE_NUM_WORKERS | No | 1 | Number of concurrent workers for rule engine processing. Increase this if you have complex automation rules that need to be processed quickly. | + ## OCR Configs Karakeep uses [tesseract.js](https://github.com/naptha/tesseract.js) to extract text from images. diff --git a/packages/shared/config.ts b/packages/shared/config.ts index 715c2848..9294e154 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -57,6 +57,11 @@ const allEnv = z.object({ CRAWLER_JOB_TIMEOUT_SEC: z.coerce.number().default(60), CRAWLER_NAVIGATE_TIMEOUT_SEC: z.coerce.number().default(30), CRAWLER_NUM_WORKERS: z.coerce.number().default(1), + INFERENCE_NUM_WORKERS: z.coerce.number().default(1), + SEARCH_NUM_WORKERS: z.coerce.number().default(1), + WEBHOOK_NUM_WORKERS: z.coerce.number().default(1), + ASSET_PREPROCESSING_NUM_WORKERS: z.coerce.number().default(1), + RULE_ENGINE_NUM_WORKERS: z.coerce.number().default(1), CRAWLER_DOWNLOAD_BANNER_IMAGE: stringBool("true"), CRAWLER_STORE_SCREENSHOT: stringBool("true"), CRAWLER_FULL_PAGE_SCREENSHOT: stringBool("false"), @@ -124,6 +129,7 @@ const serverConfigSchema = allEnv.transform((val) => { }, }, inference: { + numWorkers: val.INFERENCE_NUM_WORKERS, jobTimeoutSec: val.INFERENCE_JOB_TIMEOUT_SEC, fetchTimeoutSec: val.INFERENCE_FETCH_TIMEOUT_SEC, openAIApiKey: val.OPENAI_API_KEY, @@ -170,12 +176,15 @@ const serverConfigSchema = allEnv.transform((val) => { cacheDir: val.OCR_CACHE_DIR, confidenceThreshold: val.OCR_CONFIDENCE_THRESHOLD, }, - meilisearch: val.MEILI_ADDR - ? { - address: val.MEILI_ADDR, - key: val.MEILI_MASTER_KEY, - } - : undefined, + search: { + numWorkers: val.SEARCH_NUM_WORKERS, + meilisearch: val.MEILI_ADDR + ? { + address: val.MEILI_ADDR, + key: val.MEILI_MASTER_KEY, + } + : undefined, + }, logLevel: val.LOG_LEVEL, demoMode: val.DEMO_MODE ? { @@ -192,6 +201,13 @@ const serverConfigSchema = allEnv.transform((val) => { webhook: { timeoutSec: val.WEBHOOK_TIMEOUT_SEC, retryTimes: val.WEBHOOK_RETRY_TIMES, + numWorkers: val.WEBHOOK_NUM_WORKERS, + }, + assetPreprocessing: { + numWorkers: val.ASSET_PREPROCESSING_NUM_WORKERS, + }, + ruleEngine: { + numWorkers: val.RULE_ENGINE_NUM_WORKERS, }, assetStore: { type: val.ASSET_STORE_S3_ENDPOINT diff --git a/packages/shared/search.ts b/packages/shared/search.ts index 840fe27f..2c6904b2 100644 --- a/packages/shared/search.ts +++ b/packages/shared/search.ts @@ -28,10 +28,10 @@ export type ZBookmarkIdx = z.infer; let searchClient: MeiliSearch | undefined; -if (serverConfig.meilisearch) { +if (serverConfig.search.meilisearch) { searchClient = new MeiliSearch({ - host: serverConfig.meilisearch.address, - apiKey: serverConfig.meilisearch.key, + host: serverConfig.search.meilisearch.address, + apiKey: serverConfig.search.meilisearch.key, }); } -- cgit v1.2.3-70-g09d2