From 9edd154440c18bcc4542560e229eb293f9e0c2d4 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 21 Jul 2024 19:18:58 +0100 Subject: refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309) --- apps/workers/crawlerWorker.ts | 60 +++++++++++++++++++++---------------------- apps/workers/index.ts | 3 +++ apps/workers/openaiWorker.ts | 43 ++++++++++++++++--------------- apps/workers/package.json | 2 +- apps/workers/searchWorker.ts | 39 ++++++++++++++-------------- 5 files changed, 75 insertions(+), 72 deletions(-) (limited to 'apps/workers') diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index ddf61fc8..a1917523 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -1,11 +1,9 @@ import assert from "assert"; import * as dns from "dns"; import * as path from "node:path"; -import type { Job } from "bullmq"; import type { Browser } from "puppeteer"; import { Readability } from "@mozilla/readability"; import { Mutex } from "async-mutex"; -import { Worker } from "bullmq"; import DOMPurify from "dompurify"; import { eq } from "drizzle-orm"; import { execa } from "execa"; @@ -34,6 +32,7 @@ import { bookmarkLinks, bookmarks, } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import { ASSET_TYPES, deleteAsset, @@ -48,7 +47,6 @@ import logger from "@hoarder/shared/logger"; import { LinkCrawlerQueue, OpenAIQueue, - queueConnectionDetails, triggerSearchReindex, zCrawlLinkRequestSchema, } from "@hoarder/shared/queues"; @@ -153,37 +151,37 @@ export class CrawlerWorker { } logger.info("Starting crawler worker ..."); - const worker = new Worker( - LinkCrawlerQueue.name, - withTimeout( - runCrawler, - /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, - ), + const worker = new Runner( + LinkCrawlerQueue, { + run: withTimeout( + runCrawler, + /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, + ), + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + const bookmarkId = job?.data.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "success"); + } + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${job.error}`); + const bookmarkId = job.data?.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "failure"); + } + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.jobTimeoutSec, concurrency: serverConfig.crawler.numWorkers, - connection: queueConnectionDetails, - autorun: false, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[Crawler][${jobId}] Completed successfully`); - const bookmarkId = job?.data.bookmarkId; - if (bookmarkId) { - changeBookmarkStatus(bookmarkId, "success"); - } - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); - const bookmarkId = job?.data.bookmarkId; - if (bookmarkId) { - changeBookmarkStatus(bookmarkId, "failure"); - } - }); - return worker; } } @@ -600,7 +598,7 @@ async function crawlAndParseUrl( }; } -async function runCrawler(job: Job) { +async function runCrawler(job: DequeuedJob) { const jobId = job.id ?? "unknown"; const request = zCrawlLinkRequestSchema.safeParse(job.data); @@ -655,7 +653,7 @@ async function runCrawler(job: Job) { // Enqueue openai job (if not set, assume it's true for backward compatibility) if (job.data.runInference !== false) { - OpenAIQueue.add("openai", { + OpenAIQueue.enqueue({ bookmarkId, }); } diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 687d9ced..39741aa8 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -2,6 +2,7 @@ import "dotenv/config"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; +import { runQueueDBMigrations } from "@hoarder/shared/queues"; import { CrawlerWorker } from "./crawlerWorker"; import { shutdownPromise } from "./exit"; @@ -10,6 +11,8 @@ import { SearchIndexingWorker } from "./searchWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); + runQueueDBMigrations(); + const [crawler, openai, search] = [ await CrawlerWorker.build(), OpenAiWorker.build(), diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 776d6828..9e6e2f23 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -1,5 +1,3 @@ -import type { Job } from "bullmq"; -import { Worker } from "bullmq"; import { and, Column, eq, inArray, sql } from "drizzle-orm"; import { z } from "zod"; @@ -11,12 +9,12 @@ import { bookmarkTags, tagsOnBookmarks, } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import { readAsset } from "@hoarder/shared/assetdb"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; import { OpenAIQueue, - queueConnectionDetails, triggerSearchReindex, zOpenAIRequestSchema, } from "@hoarder/shared/queues"; @@ -63,27 +61,30 @@ async function attemptMarkTaggingStatus( export class OpenAiWorker { static build() { logger.info("Starting inference worker ..."); - const worker = new Worker( - OpenAIQueue.name, - runOpenAI, + const worker = new Runner( + OpenAIQueue, { - connection: queueConnectionDetails, - autorun: false, + run: runOpenAI, + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[inference][${jobId}] Completed successfully`); + await attemptMarkTaggingStatus(job?.data, "success"); + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error( + `[inference][${jobId}] inference job failed: ${job.error}`, + ); + await attemptMarkTaggingStatus(job?.data, "failure"); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[inference][${jobId}] Completed successfully`); - attemptMarkTaggingStatus(job?.data, "success"); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[inference][${jobId}] inference job failed: ${error}`); - attemptMarkTaggingStatus(job?.data, "failure"); - }); - return worker; } } @@ -361,7 +362,7 @@ async function connectTags( }); } -async function runOpenAI(job: Job) { +async function runOpenAI(job: DequeuedJob) { const jobId = job.id ?? "unknown"; const inferenceClient = InferenceClientFactory.build(); diff --git a/apps/workers/package.json b/apps/workers/package.json index b74f9ec9..471606f2 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -7,10 +7,10 @@ "@hoarder/db": "workspace:^0.1.0", "@hoarder/shared": "workspace:^0.1.0", "@hoarder/tsconfig": "workspace:^0.1.0", + "@hoarder/queue": "workspace:^0.1.0", "@mozilla/readability": "^0.5.0", "@tsconfig/node21": "^21.0.1", "async-mutex": "^0.4.1", - "bullmq": "^5.1.9", "dompurify": "^3.0.9", "dotenv": "^16.4.1", "drizzle-orm": "^0.29.4", diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts index 75c057c6..56c2aac4 100644 --- a/apps/workers/searchWorker.ts +++ b/apps/workers/searchWorker.ts @@ -1,13 +1,11 @@ -import type { Job } from "bullmq"; -import { Worker } from "bullmq"; import { eq } from "drizzle-orm"; import type { ZSearchIndexingRequest } from "@hoarder/shared/queues"; import { db } from "@hoarder/db"; import { bookmarks } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import logger from "@hoarder/shared/logger"; import { - queueConnectionDetails, SearchIndexingQueue, zSearchIndexingRequestSchema, } from "@hoarder/shared/queues"; @@ -16,25 +14,28 @@ import { getSearchIdxClient } from "@hoarder/shared/search"; export class SearchIndexingWorker { static build() { logger.info("Starting search indexing worker ..."); - const worker = new Worker( - SearchIndexingQueue.name, - runSearchIndexing, + const worker = new Runner( + SearchIndexingQueue, { - connection: queueConnectionDetails, - autorun: false, + run: runSearchIndexing, + onComplete: (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job?.id ?? "unknown"; + logger.error(`[search][${jobId}] search job failed: ${job.error}`); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[search][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[search][${jobId}] search job failed: ${error}`); - }); - return worker; } } @@ -112,7 +113,7 @@ async function runDelete( await ensureTaskSuccess(searchClient, task.taskUid); } -async function runSearchIndexing(job: Job) { +async function runSearchIndexing(job: DequeuedJob) { const jobId = job.id ?? "unknown"; const request = zSearchIndexingRequestSchema.safeParse(job.data); -- cgit v1.2.3-70-g09d2