aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/index.ts
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--apps/workers/index.ts84
1 files changed, 73 insertions, 11 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index b605b50f..c7b9533d 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -3,9 +3,22 @@ import "dotenv/config";
import { buildServer } from "server";
import {
+ AdminMaintenanceQueue,
+ AssetPreprocessingQueue,
+ BackupQueue,
+ FeedQueue,
+ initTracing,
+ LinkCrawlerQueue,
loadAllPlugins,
+ LowPriorityCrawlerQueue,
+ OpenAIQueue,
prepareQueue,
+ RuleEngineQueue,
+ SearchIndexingQueue,
+ shutdownTracing,
startQueue,
+ VideoWorkerQueue,
+ WebhookQueue,
} from "@karakeep/shared-server";
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
@@ -16,6 +29,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
+import { ImportWorker } from "./workers/importWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
@@ -23,19 +37,53 @@ import { VideoWorker } from "./workers/videoWorker";
import { WebhookWorker } from "./workers/webhookWorker";
const workerBuilders = {
- crawler: () => CrawlerWorker.build(),
- inference: () => OpenAiWorker.build(),
- search: () => SearchIndexingWorker.build(),
- adminMaintenance: () => AdminMaintenanceWorker.build(),
- video: () => VideoWorker.build(),
- feed: () => FeedWorker.build(),
- assetPreprocessing: () => AssetPreprocessingWorker.build(),
- webhook: () => WebhookWorker.build(),
- ruleEngine: () => RuleEngineWorker.build(),
- backup: () => BackupWorker.build(),
+ crawler: async () => {
+ await LinkCrawlerQueue.ensureInit();
+ return CrawlerWorker.build(LinkCrawlerQueue);
+ },
+ lowPriorityCrawler: async () => {
+ await LowPriorityCrawlerQueue.ensureInit();
+ return CrawlerWorker.build(LowPriorityCrawlerQueue);
+ },
+ inference: async () => {
+ await OpenAIQueue.ensureInit();
+ return OpenAiWorker.build();
+ },
+ search: async () => {
+ await SearchIndexingQueue.ensureInit();
+ return SearchIndexingWorker.build();
+ },
+ adminMaintenance: async () => {
+ await AdminMaintenanceQueue.ensureInit();
+ return AdminMaintenanceWorker.build();
+ },
+ video: async () => {
+ await VideoWorkerQueue.ensureInit();
+ return VideoWorker.build();
+ },
+ feed: async () => {
+ await FeedQueue.ensureInit();
+ return FeedWorker.build();
+ },
+ assetPreprocessing: async () => {
+ await AssetPreprocessingQueue.ensureInit();
+ return AssetPreprocessingWorker.build();
+ },
+ webhook: async () => {
+ await WebhookQueue.ensureInit();
+ return WebhookWorker.build();
+ },
+ ruleEngine: async () => {
+ await RuleEngineQueue.ensureInit();
+ return RuleEngineWorker.build();
+ },
+ backup: async () => {
+ await BackupQueue.ensureInit();
+ return BackupWorker.build();
+ },
} as const;
-type WorkerName = keyof typeof workerBuilders;
+type WorkerName = keyof typeof workerBuilders | "import";
const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);
@@ -51,6 +99,7 @@ function isWorkerEnabled(name: WorkerName) {
async function main() {
await loadAllPlugins();
+ initTracing("workers");
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
await prepareQueue();
@@ -75,10 +124,19 @@ async function main() {
BackupSchedulingWorker.start();
}
+ // Start import polling worker
+ let importWorker: ImportWorker | null = null;
+ let importWorkerPromise: Promise<void> | null = null;
+ if (isWorkerEnabled("import")) {
+ importWorker = new ImportWorker();
+ importWorkerPromise = importWorker.start();
+ }
+
await Promise.any([
Promise.all([
...workers.map(({ worker }) => worker.run()),
httpServer.serve(),
+ ...(importWorkerPromise ? [importWorkerPromise] : []),
]),
shutdownPromise,
]);
@@ -93,10 +151,14 @@ async function main() {
if (workers.some((w) => w.name === "backup")) {
BackupSchedulingWorker.stop();
}
+ if (importWorker) {
+ importWorker.stop();
+ }
for (const { worker } of workers) {
worker.stop();
}
await httpServer.stop();
+ await shutdownTracing();
process.exit(0);
}