diff options
Diffstat (limited to '')
| -rw-r--r-- | apps/workers/index.ts | 84 |
1 files changed, 73 insertions, 11 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index b605b50f..c7b9533d 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -3,9 +3,22 @@ import "dotenv/config"; import { buildServer } from "server"; import { + AdminMaintenanceQueue, + AssetPreprocessingQueue, + BackupQueue, + FeedQueue, + initTracing, + LinkCrawlerQueue, loadAllPlugins, + LowPriorityCrawlerQueue, + OpenAIQueue, prepareQueue, + RuleEngineQueue, + SearchIndexingQueue, + shutdownTracing, startQueue, + VideoWorkerQueue, + WebhookQueue, } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; @@ -16,6 +29,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { ImportWorker } from "./workers/importWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; import { RuleEngineWorker } from "./workers/ruleEngineWorker"; import { SearchIndexingWorker } from "./workers/searchWorker"; @@ -23,19 +37,53 @@ import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; const workerBuilders = { - crawler: () => CrawlerWorker.build(), - inference: () => OpenAiWorker.build(), - search: () => SearchIndexingWorker.build(), - adminMaintenance: () => AdminMaintenanceWorker.build(), - video: () => VideoWorker.build(), - feed: () => FeedWorker.build(), - assetPreprocessing: () => AssetPreprocessingWorker.build(), - webhook: () => WebhookWorker.build(), - ruleEngine: () => RuleEngineWorker.build(), - backup: () => BackupWorker.build(), + crawler: async () => { + await LinkCrawlerQueue.ensureInit(); + return CrawlerWorker.build(LinkCrawlerQueue); + }, + lowPriorityCrawler: async () => { + await LowPriorityCrawlerQueue.ensureInit(); + return CrawlerWorker.build(LowPriorityCrawlerQueue); + }, + inference: async () => { + await OpenAIQueue.ensureInit(); + return OpenAiWorker.build(); + }, + search: async () => { + await SearchIndexingQueue.ensureInit(); + return SearchIndexingWorker.build(); + }, + adminMaintenance: async () => { + await AdminMaintenanceQueue.ensureInit(); + return AdminMaintenanceWorker.build(); + }, + video: async () => { + await VideoWorkerQueue.ensureInit(); + return VideoWorker.build(); + }, + feed: async () => { + await FeedQueue.ensureInit(); + return FeedWorker.build(); + }, + assetPreprocessing: async () => { + await AssetPreprocessingQueue.ensureInit(); + return AssetPreprocessingWorker.build(); + }, + webhook: async () => { + await WebhookQueue.ensureInit(); + return WebhookWorker.build(); + }, + ruleEngine: async () => { + await RuleEngineQueue.ensureInit(); + return RuleEngineWorker.build(); + }, + backup: async () => { + await BackupQueue.ensureInit(); + return BackupWorker.build(); + }, } as const; -type WorkerName = keyof typeof workerBuilders; +type WorkerName = keyof typeof workerBuilders | "import"; const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); @@ -51,6 +99,7 @@ function isWorkerEnabled(name: WorkerName) { async function main() { await loadAllPlugins(); + initTracing("workers"); logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); await prepareQueue(); @@ -75,10 +124,19 @@ async function main() { BackupSchedulingWorker.start(); } + // Start import polling worker + let importWorker: ImportWorker | null = null; + let importWorkerPromise: Promise<void> | null = null; + if (isWorkerEnabled("import")) { + importWorker = new ImportWorker(); + importWorkerPromise = importWorker.start(); + } + await Promise.any([ Promise.all([ ...workers.map(({ worker }) => worker.run()), httpServer.serve(), + ...(importWorkerPromise ? [importWorkerPromise] : []), ]), shutdownPromise, ]); @@ -93,10 +151,14 @@ async function main() { if (workers.some((w) => w.name === "backup")) { BackupSchedulingWorker.stop(); } + if (importWorker) { + importWorker.stop(); + } for (const { worker } of workers) { worker.stop(); } await httpServer.stop(); + await shutdownTracing(); process.exit(0); } |
