diff options
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/index.ts | 93 |
1 files changed, 49 insertions, 44 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index f34e4722..cd860fc0 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -18,65 +18,70 @@ import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; +const workerBuilders = { + crawler: () => CrawlerWorker.build(), + inference: () => OpenAiWorker.build(), + search: () => SearchIndexingWorker.build(), + tidyAssets: () => TidyAssetsWorker.build(), + video: () => VideoWorker.build(), + feed: () => FeedWorker.build(), + assetPreprocessing: () => AssetPreprocessingWorker.build(), + webhook: () => WebhookWorker.build(), + ruleEngine: () => RuleEngineWorker.build(), +} as const; + +type WorkerName = keyof typeof workerBuilders; +const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); +const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); + +function isWorkerEnabled(name: WorkerName) { + if (enabledWorkers.size > 0 && !enabledWorkers.has(name)) { + return false; + } + if (disabledWorkers.has(name)) { + return false; + } + return true; +} + async function main() { await loadAllPlugins(); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [ - crawler, - inference, - search, - tidyAssets, - video, - feed, - assetPreprocessing, - webhook, - ruleEngine, - httpServer, - ] = [ - await CrawlerWorker.build(), - OpenAiWorker.build(), - SearchIndexingWorker.build(), - TidyAssetsWorker.build(), - VideoWorker.build(), - FeedWorker.build(), - AssetPreprocessingWorker.build(), - WebhookWorker.build(), - RuleEngineWorker.build(), - buildServer(), - ]; - FeedRefreshingWorker.start(); + const httpServer = buildServer(); + + const workers = await Promise.all( + Object.entries(workerBuilders) + .filter(([name]) => isWorkerEnabled(name as WorkerName)) + .map(async ([name, builder]) => ({ + name: name as WorkerName, + worker: await builder(), + })), + ); + + if (workers.some((w) => w.name === "feed")) { + FeedRefreshingWorker.start(); + } await Promise.any([ Promise.all([ - crawler.run(), - inference.run(), - search.run(), - tidyAssets.run(), - video.run(), - feed.run(), - assetPreprocessing.run(), - webhook.run(), - ruleEngine.run(), + ...workers.map(({ worker }) => worker.run()), httpServer.serve(), ]), shutdownPromise, ]); + logger.info( - "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", + `Shutting down ${workers.map((w) => w.name).join(", ")} workers ...`, ); - FeedRefreshingWorker.stop(); - crawler.stop(); - inference.stop(); - search.stop(); - tidyAssets.stop(); - video.stop(); - feed.stop(); - assetPreprocessing.stop(); - webhook.stop(); - ruleEngine.stop(); + if (workers.some((w) => w.name === "feed")) { + FeedRefreshingWorker.stop(); + } + for (const { worker } of workers) { + worker.stop(); + } await httpServer.stop(); process.exit(0); } |
