diff options
Diffstat (limited to 'apps/workers/crawlerWorker.ts')
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 60 |
1 files changed, 29 insertions, 31 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index ddf61fc8..a1917523 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -1,11 +1,9 @@ import assert from "assert"; import * as dns from "dns"; import * as path from "node:path"; -import type { Job } from "bullmq"; import type { Browser } from "puppeteer"; import { Readability } from "@mozilla/readability"; import { Mutex } from "async-mutex"; -import { Worker } from "bullmq"; import DOMPurify from "dompurify"; import { eq } from "drizzle-orm"; import { execa } from "execa"; @@ -34,6 +32,7 @@ import { bookmarkLinks, bookmarks, } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import { ASSET_TYPES, deleteAsset, @@ -48,7 +47,6 @@ import logger from "@hoarder/shared/logger"; import { LinkCrawlerQueue, OpenAIQueue, - queueConnectionDetails, triggerSearchReindex, zCrawlLinkRequestSchema, } from "@hoarder/shared/queues"; @@ -153,37 +151,37 @@ export class CrawlerWorker { } logger.info("Starting crawler worker ..."); - const worker = new Worker<ZCrawlLinkRequest, void>( - LinkCrawlerQueue.name, - withTimeout( - runCrawler, - /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, - ), + const worker = new Runner<ZCrawlLinkRequest>( + LinkCrawlerQueue, { + run: withTimeout( + runCrawler, + /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, + ), + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + const bookmarkId = job?.data.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "success"); + } + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${job.error}`); + const bookmarkId = job.data?.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "failure"); + } + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.jobTimeoutSec, concurrency: serverConfig.crawler.numWorkers, - connection: queueConnectionDetails, - autorun: false, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[Crawler][${jobId}] Completed successfully`); - const bookmarkId = job?.data.bookmarkId; - if (bookmarkId) { - changeBookmarkStatus(bookmarkId, "success"); - } - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); - const bookmarkId = job?.data.bookmarkId; - if (bookmarkId) { - changeBookmarkStatus(bookmarkId, "failure"); - } - }); - return worker; } } @@ -600,7 +598,7 @@ async function crawlAndParseUrl( }; } -async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { +async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { const jobId = job.id ?? "unknown"; const request = zCrawlLinkRequestSchema.safeParse(job.data); @@ -655,7 +653,7 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { // Enqueue openai job (if not set, assume it's true for backward compatibility) if (job.data.runInference !== false) { - OpenAIQueue.add("openai", { + OpenAIQueue.enqueue({ bookmarkId, }); } |
