diff options
| author | Mohamed Bassem <me@mbassem.com> | 2024-07-21 19:18:58 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-07-21 19:18:58 +0100 |
| commit | 9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch) | |
| tree | 2423f82619d48656f8dc60870fab8b152eef4401 /apps | |
| parent | edbd98d7841388d1169a3a3b159367487bda431e (diff) | |
| download | karakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst | |
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/web/components/dashboard/admin/ServerStats.tsx | 6 | ||||
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 60 | ||||
| -rw-r--r-- | apps/workers/index.ts | 3 | ||||
| -rw-r--r-- | apps/workers/openaiWorker.ts | 43 | ||||
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | apps/workers/searchWorker.ts | 39 |
6 files changed, 78 insertions, 75 deletions
diff --git a/apps/web/components/dashboard/admin/ServerStats.tsx b/apps/web/components/dashboard/admin/ServerStats.tsx index 06e3421f..e95dc437 100644 --- a/apps/web/components/dashboard/admin/ServerStats.tsx +++ b/apps/web/components/dashboard/admin/ServerStats.tsx @@ -106,19 +106,19 @@ export default function ServerStats() { <TableBody> <TableRow> <TableCell className="lg:w-2/3">Crawling Jobs</TableCell> - <TableCell>{serverStats.crawlStats.queuedInRedis}</TableCell> + <TableCell>{serverStats.crawlStats.queued}</TableCell> <TableCell>{serverStats.crawlStats.pending}</TableCell> <TableCell>{serverStats.crawlStats.failed}</TableCell> </TableRow> <TableRow> <TableCell>Indexing Jobs</TableCell> - <TableCell>{serverStats.indexingStats.queuedInRedis}</TableCell> + <TableCell>{serverStats.indexingStats.queued}</TableCell> <TableCell>-</TableCell> <TableCell>-</TableCell> </TableRow> <TableRow> <TableCell>Inference Jobs</TableCell> - <TableCell>{serverStats.inferenceStats.queuedInRedis}</TableCell> + <TableCell>{serverStats.inferenceStats.queued}</TableCell> <TableCell>{serverStats.inferenceStats.pending}</TableCell> <TableCell>{serverStats.inferenceStats.failed}</TableCell> </TableRow> diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index ddf61fc8..a1917523 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -1,11 +1,9 @@ import assert from "assert"; import * as dns from "dns"; import * as path from "node:path"; -import type { Job } from "bullmq"; import type { Browser } from "puppeteer"; import { Readability } from "@mozilla/readability"; import { Mutex } from "async-mutex"; -import { Worker } from "bullmq"; import DOMPurify from "dompurify"; import { eq } from "drizzle-orm"; import { execa } from "execa"; @@ -34,6 +32,7 @@ import { bookmarkLinks, bookmarks, } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import { ASSET_TYPES, deleteAsset, @@ -48,7 +47,6 @@ import logger from "@hoarder/shared/logger"; import { LinkCrawlerQueue, OpenAIQueue, - queueConnectionDetails, triggerSearchReindex, zCrawlLinkRequestSchema, } from "@hoarder/shared/queues"; @@ -153,37 +151,37 @@ export class CrawlerWorker { } logger.info("Starting crawler worker ..."); - const worker = new Worker<ZCrawlLinkRequest, void>( - LinkCrawlerQueue.name, - withTimeout( - runCrawler, - /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, - ), + const worker = new Runner<ZCrawlLinkRequest>( + LinkCrawlerQueue, { + run: withTimeout( + runCrawler, + /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, + ), + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + const bookmarkId = job?.data.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "success"); + } + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${job.error}`); + const bookmarkId = job.data?.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "failure"); + } + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.jobTimeoutSec, concurrency: serverConfig.crawler.numWorkers, - connection: queueConnectionDetails, - autorun: false, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[Crawler][${jobId}] Completed successfully`); - const bookmarkId = job?.data.bookmarkId; - if (bookmarkId) { - changeBookmarkStatus(bookmarkId, "success"); - } - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); - const bookmarkId = job?.data.bookmarkId; - if (bookmarkId) { - changeBookmarkStatus(bookmarkId, "failure"); - } - }); - return worker; } } @@ -600,7 +598,7 @@ async function crawlAndParseUrl( }; } -async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { +async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { const jobId = job.id ?? "unknown"; const request = zCrawlLinkRequestSchema.safeParse(job.data); @@ -655,7 +653,7 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { // Enqueue openai job (if not set, assume it's true for backward compatibility) if (job.data.runInference !== false) { - OpenAIQueue.add("openai", { + OpenAIQueue.enqueue({ bookmarkId, }); } diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 687d9ced..39741aa8 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -2,6 +2,7 @@ import "dotenv/config"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; +import { runQueueDBMigrations } from "@hoarder/shared/queues"; import { CrawlerWorker } from "./crawlerWorker"; import { shutdownPromise } from "./exit"; @@ -10,6 +11,8 @@ import { SearchIndexingWorker } from "./searchWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); + runQueueDBMigrations(); + const [crawler, openai, search] = [ await CrawlerWorker.build(), OpenAiWorker.build(), diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 776d6828..9e6e2f23 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -1,5 +1,3 @@ -import type { Job } from "bullmq"; -import { Worker } from "bullmq"; import { and, Column, eq, inArray, sql } from "drizzle-orm"; import { z } from "zod"; @@ -11,12 +9,12 @@ import { bookmarkTags, tagsOnBookmarks, } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import { readAsset } from "@hoarder/shared/assetdb"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; import { OpenAIQueue, - queueConnectionDetails, triggerSearchReindex, zOpenAIRequestSchema, } from "@hoarder/shared/queues"; @@ -63,27 +61,30 @@ async function attemptMarkTaggingStatus( export class OpenAiWorker { static build() { logger.info("Starting inference worker ..."); - const worker = new Worker<ZOpenAIRequest, void>( - OpenAIQueue.name, - runOpenAI, + const worker = new Runner<ZOpenAIRequest>( + OpenAIQueue, { - connection: queueConnectionDetails, - autorun: false, + run: runOpenAI, + onComplete: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[inference][${jobId}] Completed successfully`); + await attemptMarkTaggingStatus(job?.data, "success"); + }, + onError: async (job) => { + const jobId = job?.id ?? "unknown"; + logger.error( + `[inference][${jobId}] inference job failed: ${job.error}`, + ); + await attemptMarkTaggingStatus(job?.data, "failure"); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[inference][${jobId}] Completed successfully`); - attemptMarkTaggingStatus(job?.data, "success"); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[inference][${jobId}] inference job failed: ${error}`); - attemptMarkTaggingStatus(job?.data, "failure"); - }); - return worker; } } @@ -361,7 +362,7 @@ async function connectTags( }); } -async function runOpenAI(job: Job<ZOpenAIRequest, void>) { +async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) { const jobId = job.id ?? "unknown"; const inferenceClient = InferenceClientFactory.build(); diff --git a/apps/workers/package.json b/apps/workers/package.json index b74f9ec9..471606f2 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -7,10 +7,10 @@ "@hoarder/db": "workspace:^0.1.0", "@hoarder/shared": "workspace:^0.1.0", "@hoarder/tsconfig": "workspace:^0.1.0", + "@hoarder/queue": "workspace:^0.1.0", "@mozilla/readability": "^0.5.0", "@tsconfig/node21": "^21.0.1", "async-mutex": "^0.4.1", - "bullmq": "^5.1.9", "dompurify": "^3.0.9", "dotenv": "^16.4.1", "drizzle-orm": "^0.29.4", diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts index 75c057c6..56c2aac4 100644 --- a/apps/workers/searchWorker.ts +++ b/apps/workers/searchWorker.ts @@ -1,13 +1,11 @@ -import type { Job } from "bullmq"; -import { Worker } from "bullmq"; import { eq } from "drizzle-orm"; import type { ZSearchIndexingRequest } from "@hoarder/shared/queues"; import { db } from "@hoarder/db"; import { bookmarks } from "@hoarder/db/schema"; +import { DequeuedJob, Runner } from "@hoarder/queue"; import logger from "@hoarder/shared/logger"; import { - queueConnectionDetails, SearchIndexingQueue, zSearchIndexingRequestSchema, } from "@hoarder/shared/queues"; @@ -16,25 +14,28 @@ import { getSearchIdxClient } from "@hoarder/shared/search"; export class SearchIndexingWorker { static build() { logger.info("Starting search indexing worker ..."); - const worker = new Worker<ZSearchIndexingRequest, void>( - SearchIndexingQueue.name, - runSearchIndexing, + const worker = new Runner<ZSearchIndexingRequest>( + SearchIndexingQueue, { - connection: queueConnectionDetails, - autorun: false, + run: runSearchIndexing, + onComplete: (job) => { + const jobId = job?.id ?? "unknown"; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job?.id ?? "unknown"; + logger.error(`[search][${jobId}] search job failed: ${job.error}`); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, }, ); - worker.on("completed", (job) => { - const jobId = job?.id ?? "unknown"; - logger.info(`[search][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id ?? "unknown"; - logger.error(`[search][${jobId}] search job failed: ${error}`); - }); - return worker; } } @@ -112,7 +113,7 @@ async function runDelete( await ensureTaskSuccess(searchClient, task.taskUid); } -async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) { +async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) { const jobId = job.id ?? "unknown"; const request = zSearchIndexingRequestSchema.safeParse(job.data); |
