aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/crawlerWorker.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers/crawlerWorker.ts')
-rw-r--r--apps/workers/crawlerWorker.ts60
1 files changed, 29 insertions, 31 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index ddf61fc8..a1917523 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -1,11 +1,9 @@
import assert from "assert";
import * as dns from "dns";
import * as path from "node:path";
-import type { Job } from "bullmq";
import type { Browser } from "puppeteer";
import { Readability } from "@mozilla/readability";
import { Mutex } from "async-mutex";
-import { Worker } from "bullmq";
import DOMPurify from "dompurify";
import { eq } from "drizzle-orm";
import { execa } from "execa";
@@ -34,6 +32,7 @@ import {
bookmarkLinks,
bookmarks,
} from "@hoarder/db/schema";
+import { DequeuedJob, Runner } from "@hoarder/queue";
import {
ASSET_TYPES,
deleteAsset,
@@ -48,7 +47,6 @@ import logger from "@hoarder/shared/logger";
import {
LinkCrawlerQueue,
OpenAIQueue,
- queueConnectionDetails,
triggerSearchReindex,
zCrawlLinkRequestSchema,
} from "@hoarder/shared/queues";
@@ -153,37 +151,37 @@ export class CrawlerWorker {
}
logger.info("Starting crawler worker ...");
- const worker = new Worker<ZCrawlLinkRequest, void>(
- LinkCrawlerQueue.name,
- withTimeout(
- runCrawler,
- /* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
- ),
+ const worker = new Runner<ZCrawlLinkRequest>(
+ LinkCrawlerQueue,
{
+ run: withTimeout(
+ runCrawler,
+ /* timeoutSec */ serverConfig.crawler.jobTimeoutSec,
+ ),
+ onComplete: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.info(`[Crawler][${jobId}] Completed successfully`);
+ const bookmarkId = job?.data.bookmarkId;
+ if (bookmarkId) {
+ await changeBookmarkStatus(bookmarkId, "success");
+ }
+ },
+ onError: async (job) => {
+ const jobId = job?.id ?? "unknown";
+ logger.error(`[Crawler][${jobId}] Crawling job failed: ${job.error}`);
+ const bookmarkId = job.data?.bookmarkId;
+ if (bookmarkId) {
+ await changeBookmarkStatus(bookmarkId, "failure");
+ }
+ },
+ },
+ {
+ pollIntervalMs: 1000,
+ timeoutSecs: serverConfig.crawler.jobTimeoutSec,
concurrency: serverConfig.crawler.numWorkers,
- connection: queueConnectionDetails,
- autorun: false,
},
);
- worker.on("completed", (job) => {
- const jobId = job?.id ?? "unknown";
- logger.info(`[Crawler][${jobId}] Completed successfully`);
- const bookmarkId = job?.data.bookmarkId;
- if (bookmarkId) {
- changeBookmarkStatus(bookmarkId, "success");
- }
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id ?? "unknown";
- logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
- const bookmarkId = job?.data.bookmarkId;
- if (bookmarkId) {
- changeBookmarkStatus(bookmarkId, "failure");
- }
- });
-
return worker;
}
}
@@ -600,7 +598,7 @@ async function crawlAndParseUrl(
};
}
-async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
+async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
const jobId = job.id ?? "unknown";
const request = zCrawlLinkRequestSchema.safeParse(job.data);
@@ -655,7 +653,7 @@ async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
- OpenAIQueue.add("openai", {
+ OpenAIQueue.enqueue({
bookmarkId,
});
}