aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-07 15:32:24 +0100
committerGitHub <noreply@github.com>2025-09-07 15:32:24 +0100
commit492b15203807b4ceb00af4b301958344cc5a668f (patch)
tree9b91f58cc22cdb4a6a46bd61a968b38729ac8a65 /apps/workers
parent4cc86240757376a1f5893ad3fa52f45ff8826a88 (diff)
downloadkarakeep-492b15203807b4ceb00af4b301958344cc5a668f.tar.zst
feat(workers): add worker enable/disable lists (#1885)
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts93
1 files changed, 49 insertions, 44 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index f34e4722..cd860fc0 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -18,65 +18,70 @@ import { TidyAssetsWorker } from "./workers/tidyAssetsWorker";
import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
+const workerBuilders = {
+ crawler: () => CrawlerWorker.build(),
+ inference: () => OpenAiWorker.build(),
+ search: () => SearchIndexingWorker.build(),
+ tidyAssets: () => TidyAssetsWorker.build(),
+ video: () => VideoWorker.build(),
+ feed: () => FeedWorker.build(),
+ assetPreprocessing: () => AssetPreprocessingWorker.build(),
+ webhook: () => WebhookWorker.build(),
+ ruleEngine: () => RuleEngineWorker.build(),
+} as const;
+
+type WorkerName = keyof typeof workerBuilders;
+const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
+const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);
+
+function isWorkerEnabled(name: WorkerName) {
+ if (enabledWorkers.size > 0 && !enabledWorkers.has(name)) {
+ return false;
+ }
+ if (disabledWorkers.has(name)) {
+ return false;
+ }
+ return true;
+}
+
async function main() {
await loadAllPlugins();
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [
- crawler,
- inference,
- search,
- tidyAssets,
- video,
- feed,
- assetPreprocessing,
- webhook,
- ruleEngine,
- httpServer,
- ] = [
- await CrawlerWorker.build(),
- OpenAiWorker.build(),
- SearchIndexingWorker.build(),
- TidyAssetsWorker.build(),
- VideoWorker.build(),
- FeedWorker.build(),
- AssetPreprocessingWorker.build(),
- WebhookWorker.build(),
- RuleEngineWorker.build(),
- buildServer(),
- ];
- FeedRefreshingWorker.start();
+ const httpServer = buildServer();
+
+ const workers = await Promise.all(
+ Object.entries(workerBuilders)
+ .filter(([name]) => isWorkerEnabled(name as WorkerName))
+ .map(async ([name, builder]) => ({
+ name: name as WorkerName,
+ worker: await builder(),
+ })),
+ );
+
+ if (workers.some((w) => w.name === "feed")) {
+ FeedRefreshingWorker.start();
+ }
await Promise.any([
Promise.all([
- crawler.run(),
- inference.run(),
- search.run(),
- tidyAssets.run(),
- video.run(),
- feed.run(),
- assetPreprocessing.run(),
- webhook.run(),
- ruleEngine.run(),
+ ...workers.map(({ worker }) => worker.run()),
httpServer.serve(),
]),
shutdownPromise,
]);
+
logger.info(
- "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...",
+ `Shutting down ${workers.map((w) => w.name).join(", ")} workers ...`,
);
- FeedRefreshingWorker.stop();
- crawler.stop();
- inference.stop();
- search.stop();
- tidyAssets.stop();
- video.stop();
- feed.stop();
- assetPreprocessing.stop();
- webhook.stop();
- ruleEngine.stop();
+ if (workers.some((w) => w.name === "feed")) {
+ FeedRefreshingWorker.stop();
+ }
+ for (const { worker } of workers) {
+ worker.stop();
+ }
await httpServer.stop();
process.exit(0);
}