aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/index.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-04 09:44:18 +0000
committerGitHub <noreply@github.com>2026-02-04 09:44:18 +0000
commit3c838ddb26c1e86d3f201ce71f13c834be705f69 (patch)
tree892fe4f8cd2ca01d6e4cd34f677fc16aa2fd63f6 /apps/workers/index.ts
parent3fcccb858ee3ef22fe9ce479af4ce458ac9a0fe1 (diff)
downloadkarakeep-3c838ddb26c1e86d3f201ce71f13c834be705f69.tar.zst
feat: Import workflow v3 (#2378)
* feat: import workflow v3 * batch stage * revert migration * cleanups * pr comments * move to models * add allowed workers * e2e tests * import list ids * add missing indicies * merge test * more fixes * add resume/pause to UI * fix ui states * fix tests * simplify progress tracking * remove backpressure * fix list imports * fix race on claiming bookmarks * remove the codex file
Diffstat (limited to 'apps/workers/index.ts')
-rw-r--r--apps/workers/index.ts15
1 files changed, 14 insertions, 1 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index dfbac85b..931a505f 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -28,6 +28,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
+import { ImportWorker } from "./workers/importWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
@@ -77,7 +78,7 @@ const workerBuilders = {
},
} as const;
-type WorkerName = keyof typeof workerBuilders;
+type WorkerName = keyof typeof workerBuilders | "import";
const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);
@@ -118,10 +119,19 @@ async function main() {
BackupSchedulingWorker.start();
}
+ // Start import polling worker
+ let importWorker: ImportWorker | null = null;
+ let importWorkerPromise: Promise<void> | null = null;
+ if (isWorkerEnabled("import")) {
+ importWorker = new ImportWorker();
+ importWorkerPromise = importWorker.start();
+ }
+
await Promise.any([
Promise.all([
...workers.map(({ worker }) => worker.run()),
httpServer.serve(),
+ ...(importWorkerPromise ? [importWorkerPromise] : []),
]),
shutdownPromise,
]);
@@ -136,6 +146,9 @@ async function main() {
if (workers.some((w) => w.name === "backup")) {
BackupSchedulingWorker.stop();
}
+ if (importWorker) {
+ importWorker.stop();
+ }
for (const { worker } of workers) {
worker.stop();
}