From 492b15203807b4ceb00af4b301958344cc5a668f Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 7 Sep 2025 15:32:24 +0100 Subject: feat(workers): add worker enable/disable lists (#1885) --- apps/workers/index.ts | 93 +++++++++++++++++++++++-------------------- docs/docs/03-configuration.md | 2 + packages/shared/config.ts | 20 ++++++++++ 3 files changed, 71 insertions(+), 44 deletions(-) diff --git a/apps/workers/index.ts b/apps/workers/index.ts index f34e4722..cd860fc0 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -18,65 +18,70 @@ import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; +const workerBuilders = { + crawler: () => CrawlerWorker.build(), + inference: () => OpenAiWorker.build(), + search: () => SearchIndexingWorker.build(), + tidyAssets: () => TidyAssetsWorker.build(), + video: () => VideoWorker.build(), + feed: () => FeedWorker.build(), + assetPreprocessing: () => AssetPreprocessingWorker.build(), + webhook: () => WebhookWorker.build(), + ruleEngine: () => RuleEngineWorker.build(), +} as const; + +type WorkerName = keyof typeof workerBuilders; +const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); +const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); + +function isWorkerEnabled(name: WorkerName) { + if (enabledWorkers.size > 0 && !enabledWorkers.has(name)) { + return false; + } + if (disabledWorkers.has(name)) { + return false; + } + return true; +} + async function main() { await loadAllPlugins(); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [ - crawler, - inference, - search, - tidyAssets, - video, - feed, - assetPreprocessing, - webhook, - ruleEngine, - httpServer, - ] = [ - await CrawlerWorker.build(), - OpenAiWorker.build(), - SearchIndexingWorker.build(), - TidyAssetsWorker.build(), - VideoWorker.build(), - FeedWorker.build(), - AssetPreprocessingWorker.build(), - WebhookWorker.build(), - RuleEngineWorker.build(), - buildServer(), - ]; - FeedRefreshingWorker.start(); + const httpServer = buildServer(); + + const workers = await Promise.all( + Object.entries(workerBuilders) + .filter(([name]) => isWorkerEnabled(name as WorkerName)) + .map(async ([name, builder]) => ({ + name: name as WorkerName, + worker: await builder(), + })), + ); + + if (workers.some((w) => w.name === "feed")) { + FeedRefreshingWorker.start(); + } await Promise.any([ Promise.all([ - crawler.run(), - inference.run(), - search.run(), - tidyAssets.run(), - video.run(), - feed.run(), - assetPreprocessing.run(), - webhook.run(), - ruleEngine.run(), + ...workers.map(({ worker }) => worker.run()), httpServer.serve(), ]), shutdownPromise, ]); + logger.info( - "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", + `Shutting down ${workers.map((w) => w.name).join(", ")} workers ...`, ); - FeedRefreshingWorker.stop(); - crawler.stop(); - inference.stop(); - search.stop(); - tidyAssets.stop(); - video.stop(); - feed.stop(); - assetPreprocessing.stop(); - webhook.stop(); - ruleEngine.stop(); + if (workers.some((w) => w.name === "feed")) { + FeedRefreshingWorker.stop(); + } + for (const { worker } of workers) { + worker.stop(); + } await httpServer.stop(); process.exit(0); } diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md index bd53f1bb..aae1ffa3 100644 --- a/docs/docs/03-configuration.md +++ b/docs/docs/03-configuration.md @@ -6,6 +6,8 @@ The app is mainly configured by environment variables. All the used environment | ------------------------------- | ------------------------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | | PORT | No | 3000 | The port on which the web server will listen. DON'T CHANGE THIS IF YOU'RE USING DOCKER, instead changed the docker bound external port. | | WORKERS_PORT | No | 0 (Random Port) | The port on which the worker will export its prometheus metrics on `/metrics`. By default it's a random unused port. If you want to utilize those metrics, fix the port to a value (and export it in docker if you're using docker). | +| WORKERS_ENABLED_WORKERS | No | Not set | Comma separated list of worker names to enable. If set, only these workers will run. Valid values: crawler,inference,search,tidyAssets,video,feed,assetPreprocessing,webhook,ruleEngine. | +| WORKERS_DISABLED_WORKERS | No | Not set | Comma separated list of worker names to disable. Takes precedence over `WORKERS_ENABLED_WORKERS`. | | DATA_DIR | Yes | Not set | The path for the persistent data directory. This is where the db lives. Assets are stored here by default unless `ASSETS_DIR` is set. | | ASSETS_DIR | No | Not set | The path where crawled assets will be stored. If not set, defaults to `${DATA_DIR}/assets`. | | NEXTAUTH_URL | Yes | Not set | Should point to the address of your server. The app will function without it, but will redirect you to wrong addresses on signout for example. | diff --git a/packages/shared/config.ts b/packages/shared/config.ts index 15c2b4a5..3cc65f4c 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -20,6 +20,24 @@ const allEnv = z.object({ PORT: z.coerce.number().default(3000), WORKERS_HOST: z.string().default("127.0.0.1"), WORKERS_PORT: z.coerce.number().default(0), + WORKERS_ENABLED_WORKERS: z + .string() + .default("") + .transform((val) => + val + .split(",") + .map((w) => w.trim()) + .filter((w) => w), + ), + WORKERS_DISABLED_WORKERS: z + .string() + .default("") + .transform((val) => + val + .split(",") + .map((w) => w.trim()) + .filter((w) => w), + ), API_URL: z.string().url().default("http://localhost:3000"), NEXTAUTH_URL: z .string() @@ -153,6 +171,8 @@ const serverConfigSchema = allEnv.transform((val, ctx) => { workers: { host: val.WORKERS_HOST, port: val.WORKERS_PORT, + enabledWorkers: val.WORKERS_ENABLED_WORKERS, + disabledWorkers: val.WORKERS_DISABLED_WORKERS, }, apiUrl: val.API_URL, publicUrl: val.NEXTAUTH_URL, -- cgit v1.2.3-70-g09d2