From 3c838ddb26c1e86d3f201ce71f13c834be705f69 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Wed, 4 Feb 2026 09:44:18 +0000 Subject: feat: Import workflow v3 (#2378) * feat: import workflow v3 * batch stage * revert migration * cleanups * pr comments * move to models * add allowed workers * e2e tests * import list ids * add missing indicies * merge test * more fixes * add resume/pause to UI * fix ui states * fix tests * simplify progress tracking * remove backpressure * fix list imports * fix race on claiming bookmarks * remove the codex file --- apps/workers/index.ts | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'apps/workers/index.ts') diff --git a/apps/workers/index.ts b/apps/workers/index.ts index dfbac85b..931a505f 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -28,6 +28,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { ImportWorker } from "./workers/importWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; import { RuleEngineWorker } from "./workers/ruleEngineWorker"; import { SearchIndexingWorker } from "./workers/searchWorker"; @@ -77,7 +78,7 @@ const workerBuilders = { }, } as const; -type WorkerName = keyof typeof workerBuilders; +type WorkerName = keyof typeof workerBuilders | "import"; const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); @@ -118,10 +119,19 @@ async function main() { BackupSchedulingWorker.start(); } + // Start import polling worker + let importWorker: ImportWorker | null = null; + let importWorkerPromise: Promise | null = null; + if (isWorkerEnabled("import")) { + importWorker = new ImportWorker(); + importWorkerPromise = importWorker.start(); + } + await Promise.any([ Promise.all([ ...workers.map(({ worker }) => worker.run()), httpServer.serve(), + ...(importWorkerPromise ? [importWorkerPromise] : []), ]), shutdownPromise, ]); @@ -136,6 +146,9 @@ async function main() { if (workers.some((w) => w.name === "backup")) { BackupSchedulingWorker.stop(); } + if (importWorker) { + importWorker.stop(); + } for (const { worker } of workers) { worker.stop(); } -- cgit v1.2.3-70-g09d2