diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-09-07 15:32:24 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-09-07 15:32:24 +0100 |
| commit | 492b15203807b4ceb00af4b301958344cc5a668f (patch) | |
| tree | 9b91f58cc22cdb4a6a46bd61a968b38729ac8a65 /apps/workers | |
| parent | 4cc86240757376a1f5893ad3fa52f45ff8826a88 (diff) | |
| download | karakeep-492b15203807b4ceb00af4b301958344cc5a668f.tar.zst | |
feat(workers): add worker enable/disable lists (#1885)
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/index.ts | 93 |
1 files changed, 49 insertions, 44 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index f34e4722..cd860fc0 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -18,65 +18,70 @@ import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; +const workerBuilders = { + crawler: () => CrawlerWorker.build(), + inference: () => OpenAiWorker.build(), + search: () => SearchIndexingWorker.build(), + tidyAssets: () => TidyAssetsWorker.build(), + video: () => VideoWorker.build(), + feed: () => FeedWorker.build(), + assetPreprocessing: () => AssetPreprocessingWorker.build(), + webhook: () => WebhookWorker.build(), + ruleEngine: () => RuleEngineWorker.build(), +} as const; + +type WorkerName = keyof typeof workerBuilders; +const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); +const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); + +function isWorkerEnabled(name: WorkerName) { + if (enabledWorkers.size > 0 && !enabledWorkers.has(name)) { + return false; + } + if (disabledWorkers.has(name)) { + return false; + } + return true; +} + async function main() { await loadAllPlugins(); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [ - crawler, - inference, - search, - tidyAssets, - video, - feed, - assetPreprocessing, - webhook, - ruleEngine, - httpServer, - ] = [ - await CrawlerWorker.build(), - OpenAiWorker.build(), - SearchIndexingWorker.build(), - TidyAssetsWorker.build(), - VideoWorker.build(), - FeedWorker.build(), - AssetPreprocessingWorker.build(), - WebhookWorker.build(), - RuleEngineWorker.build(), - buildServer(), - ]; - FeedRefreshingWorker.start(); + const httpServer = buildServer(); + + const workers = await Promise.all( + Object.entries(workerBuilders) + .filter(([name]) => isWorkerEnabled(name as WorkerName)) + .map(async ([name, builder]) => ({ + name: name as WorkerName, + worker: await builder(), + })), + ); + + if (workers.some((w) => w.name === "feed")) { + FeedRefreshingWorker.start(); + } await Promise.any([ Promise.all([ - crawler.run(), - inference.run(), - search.run(), - tidyAssets.run(), - video.run(), - feed.run(), - assetPreprocessing.run(), - webhook.run(), - ruleEngine.run(), + ...workers.map(({ worker }) => worker.run()), httpServer.serve(), ]), shutdownPromise, ]); + logger.info( - "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", + `Shutting down ${workers.map((w) => w.name).join(", ")} workers ...`, ); - FeedRefreshingWorker.stop(); - crawler.stop(); - inference.stop(); - search.stop(); - tidyAssets.stop(); - video.stop(); - feed.stop(); - assetPreprocessing.stop(); - webhook.stop(); - ruleEngine.stop(); + if (workers.some((w) => w.name === "feed")) { + FeedRefreshingWorker.stop(); + } + for (const { worker } of workers) { + worker.stop(); + } await httpServer.stop(); process.exit(0); } |
