diff options
| author | MohamedBassem <me@mbassem.com> | 2024-02-11 16:53:17 +0000 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2024-02-11 17:57:46 +0000 |
| commit | 230cafb6dfc8d3bad57d84ef13c3669f5bf5331a (patch) | |
| tree | b59f4b386201f9fedde3c7b7546f32c2ed3f61cb /packages/workers/crawler.ts | |
| parent | 2c2d05fd0a2c3c26d765f8a6beb88d907a097c1d (diff) | |
| download | karakeep-230cafb6dfc8d3bad57d84ef13c3669f5bf5331a.tar.zst | |
fix: Fix build for workers package and add it to CI
Diffstat (limited to 'packages/workers/crawler.ts')
| -rw-r--r-- | packages/workers/crawler.ts | 50 |
1 files changed, 42 insertions, 8 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts index 1cb82f31..45d2f530 100644 --- a/packages/workers/crawler.ts +++ b/packages/workers/crawler.ts @@ -1,12 +1,16 @@ import logger from "@remember/shared/logger"; import { + LinkCrawlerQueue, OpenAIQueue, ZCrawlLinkRequest, + queueConnectionDetails, zCrawlLinkRequestSchema, } from "@remember/shared/queues"; + +import { Worker } from "bullmq"; import { Job } from "bullmq"; -import prisma from "@remember/db"; +import { prisma } from "@remember/db"; import { Browser } from "puppeteer"; import puppeteer from "puppeteer-extra"; @@ -32,14 +36,44 @@ const metascraperParser = metascraper([ metascraperUrl(), ]); -let browser: Browser; -(async () => { - puppeteer.use(StealthPlugin()); - // TODO: Configure the browser mode via an env variable - browser = await puppeteer.launch({ headless: true }); -})(); +let browser: Browser | undefined; + +export class CrawlerWorker { + static async build() { + if (!browser) { + puppeteer.use(StealthPlugin()); + console.log("HERE"); + browser = await puppeteer.launch({ headless: true }); + } + + logger.info("Starting crawler worker ..."); + const worker = new Worker<ZCrawlLinkRequest, void>( + LinkCrawlerQueue.name, + runCrawler, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); + }); + + return worker; + } +} async function crawlPage(url: string) { + if (!browser) { + throw new Error("The browser must have been initalized by this point."); + } const context = await browser.createBrowserContext(); const page = await context.newPage(); @@ -53,7 +87,7 @@ async function crawlPage(url: string) { return htmlContent; } -export default async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { +async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { const jobId = job.id || "unknown"; const request = zCrawlLinkRequestSchema.safeParse(job.data); |
