aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2024-07-21 19:18:58 +0100
committerGitHub <noreply@github.com>2024-07-21 19:18:58 +0100
commit9edd154440c18bcc4542560e229eb293f9e0c2d4 (patch)
tree2423f82619d48656f8dc60870fab8b152eef4401 /apps
parentedbd98d7841388d1169a3a3b159367487bda431e (diff)
downloadkarakeep-9edd154440c18bcc4542560e229eb293f9e0c2d4.tar.zst
refactor: Replace the usage of bullMQ with the hoarder sqlite-based queue (#309)
Diffstat (limited to 'apps')
-rw-r--r--apps/web/components/dashboard/admin/ServerStats.tsx6
-rw-r--r--apps/workers/crawlerWorker.ts60
-rw-r--r--apps/workers/index.ts3
-rw-r--r--apps/workers/openaiWorker.ts43
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/searchWorker.ts39
6 files changed, 78 insertions, 75 deletions
diff --git a/apps/web/components/dashboard/admin/ServerStats.tsx b/apps/web/components/dashboard/admin/ServerStats.tsx
index 06e3421f..e95dc437 100644
--- a/apps/web/components/dashboard/admin/ServerStats.tsx
+++ b/apps/web/components/dashboard/admin/ServerStats.tsx
@@ -106,19 +106,19 @@ export default function ServerStats() {
<TableBody>
<TableRow>
<TableCell className="lg:w-2/3">Crawling Jobs</TableCell>
- <TableCell>{serverStats.crawlStats.queuedInRedis}</TableCell>
+ <TableCell>{serverStats.crawlStats.queued}</TableCell>
<TableCell>{serverStats.crawlStats.pending}</TableCell>
<TableCell>{serverStats.crawlStats.failed}</TableCell>
</TableRow>
<TableRow>
<TableCell>Indexing Jobs</TableCell>
- <TableCell>{serverStats.indexingStats.queuedInRedis}</TableCell>
+ <TableCell>{serverStats.indexingStats.queued}</TableCell>
<TableCell>-</TableCell>
<TableCell>-</TableCell>
</TableRow>
<TableRow>
<TableCell>Inference Jobs</TableCell>
- <TableCell>{serverStats.inferenceStats.queuedInRedis}</TableCell>
+ <TableCell>{serverStats.inferenceStats.queued}</TableCell>
<TableCell>{serverStats.inferenceStats.pending}</TableCell>
<TableCell>{serverStats.inferenceStats.failed}</TableCell>
</TableRow>
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index ddf61fc8..a1917523 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -1,11 +1,9 @@
import assert from "assert";
import * as dns from "dns";
import * as path from "node:path";
-import type { Job } from "bullmq";
import type { Browser } from "puppeteer";
import { Readability } from "@mozilla/readability";
import { Mutex } from "async-mutex";
-import { Worker } from "bullmq";
import DOMPurify from "dompurify";
import { eq } from "drizzle-orm";
import { execa } from "execa";
@@ -34,6 +32,7 @@ import {
bookmarkLinks,
bookmarks,
} from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
import {
ASSET_TYPES,
deleteAsset,
@@ -48,7 +47,6 @@ import logger from "@hoarder/shared/logger";
import {
LinkCrawlerQueue,
OpenAIQueue,
- queueConnectionDetails,
triggerSearchReindex,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
@@ -153,37 +151,37 @@ export class CrawlerWorker {
}
logger.info("Starting crawler worker ...");
- const worker = new Worker<ZCrawlLinkRequest, void>(
- LinkCrawlerQueue.name,
- withTimeout(
- runCrawler,
- /* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
- ),
+ const worker = new Runner<ZCrawlLinkRequest>(
+ LinkCrawlerQueue,
{
+ run: withTimeout(
+ runCrawler,
+ /* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
+ ),
+ onComplete: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[Crawler][${jobId}] Completed successfully`);
+ const bookmarkId = job?.data.bookmarkId;
+ if (bookmarkId) {
+ await changeBookmarkStatus(bookmarkId, "success");
+ }
+ },
+ onError: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(`[Crawler][${jobId}] Crawling job failed: ${job.error}`);
+ const bookmarkId = job.data?.bookmarkId;
+ if (bookmarkId) {
+ await changeBookmarkStatus(bookmarkId, "failure");
+ }
+ },
+ },
+ {
+ pollIntervalMs: 1000,
+ timeoutSecs: serverConfig.crawler.jobTimeoutSec,
concurrency: serverConfig.crawler.numWorkers,
- connection: queueConnectionDetails,
- autorun: false,
},
);
- worker.on("completed", (job) => {
- const jobId = job?.id ?? "unknown";
- logger.info(`[Crawler][${jobId}] Completed successfully`);
- const bookmarkId = job?.data.bookmarkId;
- if (bookmarkId) {
- changeBookmarkStatus(bookmarkId, "success");
- }
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id ?? "unknown";
- logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
- const bookmarkId = job?.data.bookmarkId;
- if (bookmarkId) {
- changeBookmarkStatus(bookmarkId, "failure");
- }
- });
-
return worker;
}
}
@@ -600,7 +598,7 @@ async function crawlAndParseUrl(
};
}
-async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
+async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
const jobId = job.id ?? "unknown";
const request = zCrawlLinkRequestSchema.safeParse(job.data);
@@ -655,7 +653,7 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
- OpenAIQueue.add("openai", {
+ OpenAIQueue.enqueue({
bookmarkId,
});
}
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 687d9ced..39741aa8 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -2,6 +2,7 @@ import "dotenv/config";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
+import { runQueueDBMigrations } from "@hoarder/shared/queues";
import { CrawlerWorker } from "./crawlerWorker";
import { shutdownPromise } from "./exit";
@@ -10,6 +11,8 @@ import { SearchIndexingWorker } from "./searchWorker";
async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
+ runQueueDBMigrations();
+
const [crawler, openai, search] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 776d6828..9e6e2f23 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -1,5 +1,3 @@
-import type { Job } from "bullmq";
-import { Worker } from "bullmq";
import { and, Column, eq, inArray, sql } from "drizzle-orm";
import { z } from "zod";
@@ -11,12 +9,12 @@ import {
bookmarkTags,
tagsOnBookmarks,
} from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
import { readAsset } from "@hoarder/shared/assetdb";
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
OpenAIQueue,
- queueConnectionDetails,
triggerSearchReindex,
zOpenAIRequestSchema,
} from "@hoarder/shared/queues";
@@ -63,27 +61,30 @@ async function attemptMarkTaggingStatus(
export class OpenAiWorker {
static build() {
logger.info("Starting inference worker ...");
- const worker = new Worker<ZOpenAIRequest, void>(
- OpenAIQueue.name,
- runOpenAI,
+ const worker = new Runner<ZOpenAIRequest>(
+ OpenAIQueue,
{
- connection: queueConnectionDetails,
- autorun: false,
+ run: runOpenAI,
+ onComplete: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[inference][${jobId}] Completed successfully`);
+ await attemptMarkTaggingStatus(job?.data, "success");
+ },
+ onError: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(
+ `[inference][${jobId}] inference job failed: ${job.error}`,
+ );
+ await attemptMarkTaggingStatus(job?.data, "failure");
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
);
- worker.on("completed", (job) => {
- const jobId = job?.id ?? "unknown";
- logger.info(`[inference][${jobId}] Completed successfully`);
- attemptMarkTaggingStatus(job?.data, "success");
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id ?? "unknown";
- logger.error(`[inference][${jobId}] inference job failed: ${error}`);
- attemptMarkTaggingStatus(job?.data, "failure");
- });
-
return worker;
}
}
@@ -361,7 +362,7 @@ async function connectTags(
});
}
-async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
+async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
const jobId = job.id ?? "unknown";
const inferenceClient = InferenceClientFactory.build();
diff --git a/apps/workers/package.json b/apps/workers/package.json
index b74f9ec9..471606f2 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -7,10 +7,10 @@
"@hoarder/db": "workspace:^0.1.0",
"@hoarder/shared": "workspace:^0.1.0",
"@hoarder/tsconfig": "workspace:^0.1.0",
+ "@hoarder/queue": "workspace:^0.1.0",
"@mozilla/readability": "^0.5.0",
"@tsconfig/node21": "^21.0.1",
"async-mutex": "^0.4.1",
- "bullmq": "^5.1.9",
"dompurify": "^3.0.9",
"dotenv": "^16.4.1",
"drizzle-orm": "^0.29.4",
diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts
index 75c057c6..56c2aac4 100644
--- a/apps/workers/searchWorker.ts
+++ b/apps/workers/searchWorker.ts
@@ -1,13 +1,11 @@
-import type { Job } from "bullmq";
-import { Worker } from "bullmq";
import { eq } from "drizzle-orm";
import type { ZSearchIndexingRequest } from "@hoarder/shared/queues";
import { db } from "@hoarder/db";
import { bookmarks } from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
import logger from "@hoarder/shared/logger";
import {
- queueConnectionDetails,
SearchIndexingQueue,
zSearchIndexingRequestSchema,
} from "@hoarder/shared/queues";
@@ -16,25 +14,28 @@ import { getSearchIdxClient } from "@hoarder/shared/search";
export class SearchIndexingWorker {
static build() {
logger.info("Starting search indexing worker ...");
- const worker = new Worker<ZSearchIndexingRequest, void>(
- SearchIndexingQueue.name,
- runSearchIndexing,
+ const worker = new Runner<ZSearchIndexingRequest>(
+ SearchIndexingQueue,
{
- connection: queueConnectionDetails,
- autorun: false,
+ run: runSearchIndexing,
+ onComplete: (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[search][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(`[search][${jobId}] search job failed: ${job.error}`);
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
},
);
- worker.on("completed", (job) => {
- const jobId = job?.id ?? "unknown";
- logger.info(`[search][${jobId}] Completed successfully`);
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id ?? "unknown";
- logger.error(`[search][${jobId}] search job failed: ${error}`);
- });
-
return worker;
}
}
@@ -112,7 +113,7 @@ async function runDelete(
await ensureTaskSuccess(searchClient, task.taskUid);
}
-async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
+async function runSearchIndexing(job: DequeuedJob<ZSearchIndexingRequest>) {
const jobId = job.id ?? "unknown";
const request = zSearchIndexingRequestSchema.safeParse(job.data);