aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-07 15:32:24 +0100
committerGitHub <noreply@github.com>2025-09-07 15:32:24 +0100
commit492b15203807b4ceb00af4b301958344cc5a668f (patch)
tree9b91f58cc22cdb4a6a46bd61a968b38729ac8a65
parent4cc86240757376a1f5893ad3fa52f45ff8826a88 (diff)
downloadkarakeep-492b15203807b4ceb00af4b301958344cc5a668f.tar.zst
feat(workers): add worker enable/disable lists (#1885)
-rw-r--r--apps/workers/index.ts93
-rw-r--r--docs/docs/03-configuration.md2
-rw-r--r--packages/shared/config.ts20
3 files changed, 71 insertions, 44 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index f34e4722..cd860fc0 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -18,65 +18,70 @@ import { TidyAssetsWorker } from "./workers/tidyAssetsWorker";
import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
+const workerBuilders = {
+ crawler: () => CrawlerWorker.build(),
+ inference: () => OpenAiWorker.build(),
+ search: () => SearchIndexingWorker.build(),
+ tidyAssets: () => TidyAssetsWorker.build(),
+ video: () => VideoWorker.build(),
+ feed: () => FeedWorker.build(),
+ assetPreprocessing: () => AssetPreprocessingWorker.build(),
+ webhook: () => WebhookWorker.build(),
+ ruleEngine: () => RuleEngineWorker.build(),
+} as const;
+
+type WorkerName = keyof typeof workerBuilders;
+const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
+const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);
+
+function isWorkerEnabled(name: WorkerName) {
+ if (enabledWorkers.size > 0 && !enabledWorkers.has(name)) {
+ return false;
+ }
+ if (disabledWorkers.has(name)) {
+ return false;
+ }
+ return true;
+}
+
async function main() {
await loadAllPlugins();
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [
- crawler,
- inference,
- search,
- tidyAssets,
- video,
- feed,
- assetPreprocessing,
- webhook,
- ruleEngine,
- httpServer,
- ] = [
- await CrawlerWorker.build(),
- OpenAiWorker.build(),
- SearchIndexingWorker.build(),
- TidyAssetsWorker.build(),
- VideoWorker.build(),
- FeedWorker.build(),
- AssetPreprocessingWorker.build(),
- WebhookWorker.build(),
- RuleEngineWorker.build(),
- buildServer(),
- ];
- FeedRefreshingWorker.start();
+ const httpServer = buildServer();
+
+ const workers = await Promise.all(
+ Object.entries(workerBuilders)
+ .filter(([name]) => isWorkerEnabled(name as WorkerName))
+ .map(async ([name, builder]) => ({
+ name: name as WorkerName,
+ worker: await builder(),
+ })),
+ );
+
+ if (workers.some((w) => w.name === "feed")) {
+ FeedRefreshingWorker.start();
+ }
await Promise.any([
Promise.all([
- crawler.run(),
- inference.run(),
- search.run(),
- tidyAssets.run(),
- video.run(),
- feed.run(),
- assetPreprocessing.run(),
- webhook.run(),
- ruleEngine.run(),
+ ...workers.map(({ worker }) => worker.run()),
httpServer.serve(),
]),
shutdownPromise,
]);
+
logger.info(
- "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...",
+ `Shutting down ${workers.map((w) => w.name).join(", ")} workers ...`,
);
- FeedRefreshingWorker.stop();
- crawler.stop();
- inference.stop();
- search.stop();
- tidyAssets.stop();
- video.stop();
- feed.stop();
- assetPreprocessing.stop();
- webhook.stop();
- ruleEngine.stop();
+ if (workers.some((w) => w.name === "feed")) {
+ FeedRefreshingWorker.stop();
+ }
+ for (const { worker } of workers) {
+ worker.stop();
+ }
await httpServer.stop();
process.exit(0);
}
diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md
index bd53f1bb..aae1ffa3 100644
--- a/docs/docs/03-configuration.md
+++ b/docs/docs/03-configuration.md
@@ -6,6 +6,8 @@ The app is mainly configured by environment variables. All the used environment
| ------------------------------- | ------------------------------------- | --------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| PORT | No | 3000 | The port on which the web server will listen. DON'T CHANGE THIS IF YOU'RE USING DOCKER, instead changed the docker bound external port. |
| WORKERS_PORT | No | 0 (Random Port) | The port on which the worker will export its prometheus metrics on `/metrics`. By default it's a random unused port. If you want to utilize those metrics, fix the port to a value (and export it in docker if you're using docker). |
+| WORKERS_ENABLED_WORKERS | No | Not set | Comma separated list of worker names to enable. If set, only these workers will run. Valid values: crawler,inference,search,tidyAssets,video,feed,assetPreprocessing,webhook,ruleEngine. |
+| WORKERS_DISABLED_WORKERS | No | Not set | Comma separated list of worker names to disable. Takes precedence over `WORKERS_ENABLED_WORKERS`. |
| DATA_DIR | Yes | Not set | The path for the persistent data directory. This is where the db lives. Assets are stored here by default unless `ASSETS_DIR` is set. |
| ASSETS_DIR | No | Not set | The path where crawled assets will be stored. If not set, defaults to `${DATA_DIR}/assets`. |
| NEXTAUTH_URL | Yes | Not set | Should point to the address of your server. The app will function without it, but will redirect you to wrong addresses on signout for example. |
diff --git a/packages/shared/config.ts b/packages/shared/config.ts
index 15c2b4a5..3cc65f4c 100644
--- a/packages/shared/config.ts
+++ b/packages/shared/config.ts
@@ -20,6 +20,24 @@ const allEnv = z.object({
PORT: z.coerce.number().default(3000),
WORKERS_HOST: z.string().default("127.0.0.1"),
WORKERS_PORT: z.coerce.number().default(0),
+ WORKERS_ENABLED_WORKERS: z
+ .string()
+ .default("")
+ .transform((val) =>
+ val
+ .split(",")
+ .map((w) => w.trim())
+ .filter((w) => w),
+ ),
+ WORKERS_DISABLED_WORKERS: z
+ .string()
+ .default("")
+ .transform((val) =>
+ val
+ .split(",")
+ .map((w) => w.trim())
+ .filter((w) => w),
+ ),
API_URL: z.string().url().default("http://localhost:3000"),
NEXTAUTH_URL: z
.string()
@@ -153,6 +171,8 @@ const serverConfigSchema = allEnv.transform((val, ctx) => {
workers: {
host: val.WORKERS_HOST,
port: val.WORKERS_PORT,
+ enabledWorkers: val.WORKERS_ENABLED_WORKERS,
+ disabledWorkers: val.WORKERS_DISABLED_WORKERS,
},
apiUrl: val.API_URL,
publicUrl: val.NEXTAUTH_URL,