aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/index.ts
blob: b605b50f3130a05fab1bde5a0730754d6a990bfd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import "dotenv/config";

import { buildServer } from "server";

import {
  loadAllPlugins,
  prepareQueue,
  startQueue,
} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";

import { shutdownPromise } from "./exit";
import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";

const workerBuilders = {
  crawler: () => CrawlerWorker.build(),
  inference: () => OpenAiWorker.build(),
  search: () => SearchIndexingWorker.build(),
  adminMaintenance: () => AdminMaintenanceWorker.build(),
  video: () => VideoWorker.build(),
  feed: () => FeedWorker.build(),
  assetPreprocessing: () => AssetPreprocessingWorker.build(),
  webhook: () => WebhookWorker.build(),
  ruleEngine: () => RuleEngineWorker.build(),
  backup: () => BackupWorker.build(),
} as const;

type WorkerName = keyof typeof workerBuilders;
const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);

function isWorkerEnabled(name: WorkerName) {
  if (enabledWorkers.size > 0 && !enabledWorkers.has(name)) {
    return false;
  }
  if (disabledWorkers.has(name)) {
    return false;
  }
  return true;
}

async function main() {
  await loadAllPlugins();
  logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
  await prepareQueue();

  const httpServer = buildServer();

  const workers = await Promise.all(
    Object.entries(workerBuilders)
      .filter(([name]) => isWorkerEnabled(name as WorkerName))
      .map(async ([name, builder]) => ({
        name: name as WorkerName,
        worker: await builder(),
      })),
  );

  await startQueue();

  if (workers.some((w) => w.name === "feed")) {
    FeedRefreshingWorker.start();
  }

  if (workers.some((w) => w.name === "backup")) {
    BackupSchedulingWorker.start();
  }

  await Promise.any([
    Promise.all([
      ...workers.map(({ worker }) => worker.run()),
      httpServer.serve(),
    ]),
    shutdownPromise,
  ]);

  logger.info(
    `Shutting down ${workers.map((w) => w.name).join(", ")} workers ...`,
  );

  if (workers.some((w) => w.name === "feed")) {
    FeedRefreshingWorker.stop();
  }
  if (workers.some((w) => w.name === "backup")) {
    BackupSchedulingWorker.stop();
  }
  for (const { worker } of workers) {
    worker.stop();
  }
  await httpServer.stop();
  process.exit(0);
}

main();