diff options
| author | MohamedBassem <me@mbassem.com> | 2024-02-11 16:53:17 +0000 |
|---|---|---|
| committer | MohamedBassem <me@mbassem.com> | 2024-02-11 17:57:46 +0000 |
| commit | 230cafb6dfc8d3bad57d84ef13c3669f5bf5331a (patch) | |
| tree | b59f4b386201f9fedde3c7b7546f32c2ed3f61cb /packages/workers | |
| parent | 2c2d05fd0a2c3c26d765f8a6beb88d907a097c1d (diff) | |
| download | karakeep-230cafb6dfc8d3bad57d84ef13c3669f5bf5331a.tar.zst | |
fix: Fix build for workers package and add it to CI
Diffstat (limited to 'packages/workers')
| -rw-r--r-- | packages/workers/crawler.ts | 50 | ||||
| -rw-r--r-- | packages/workers/index.ts | 67 | ||||
| -rw-r--r-- | packages/workers/openai.ts | 34 | ||||
| -rw-r--r-- | packages/workers/package.json | 5 | ||||
| -rw-r--r-- | packages/workers/tsconfig.json | 6 |
5 files changed, 92 insertions, 70 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts index 1cb82f31..45d2f530 100644 --- a/packages/workers/crawler.ts +++ b/packages/workers/crawler.ts @@ -1,12 +1,16 @@ import logger from "@remember/shared/logger"; import { + LinkCrawlerQueue, OpenAIQueue, ZCrawlLinkRequest, + queueConnectionDetails, zCrawlLinkRequestSchema, } from "@remember/shared/queues"; + +import { Worker } from "bullmq"; import { Job } from "bullmq"; -import prisma from "@remember/db"; +import { prisma } from "@remember/db"; import { Browser } from "puppeteer"; import puppeteer from "puppeteer-extra"; @@ -32,14 +36,44 @@ const metascraperParser = metascraper([ metascraperUrl(), ]); -let browser: Browser; -(async () => { - puppeteer.use(StealthPlugin()); - // TODO: Configure the browser mode via an env variable - browser = await puppeteer.launch({ headless: true }); -})(); +let browser: Browser | undefined; + +export class CrawlerWorker { + static async build() { + if (!browser) { + puppeteer.use(StealthPlugin()); + console.log("HERE"); + browser = await puppeteer.launch({ headless: true }); + } + + logger.info("Starting crawler worker ..."); + const worker = new Worker<ZCrawlLinkRequest, void>( + LinkCrawlerQueue.name, + runCrawler, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); + }); + + return worker; + } +} async function crawlPage(url: string) { + if (!browser) { + throw new Error("The browser must have been initalized by this point."); + } const context = await browser.createBrowserContext(); const page = await context.newPage(); @@ -53,7 +87,7 @@ async function crawlPage(url: string) { return htmlContent; } -export default async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { +async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { const jobId = job.id || "unknown"; const request = zCrawlLinkRequestSchema.safeParse(job.data); diff --git a/packages/workers/index.ts b/packages/workers/index.ts index d16c42eb..a58b2edf 100644 --- a/packages/workers/index.ts +++ b/packages/workers/index.ts @@ -1,65 +1,16 @@ -import { Worker } from "bullmq"; - import dotenv from "dotenv"; - -import { - LinkCrawlerQueue, - OpenAIQueue, - ZCrawlLinkRequest, - ZOpenAIRequest, - queueConnectionDetails, -} from "@remember/shared/queues"; -import logger from "@remember/shared/logger"; -import runCrawler from "./crawler"; -import runOpenAI from "./openai"; - -function crawlerWorker() { - logger.info("Starting crawler worker ..."); - const worker = new Worker<ZCrawlLinkRequest, void>( - LinkCrawlerQueue.name, - runCrawler, - { - connection: queueConnectionDetails, - autorun: false, - }, - ); - - worker.on("completed", (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[Crawler][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); - }); - - return worker; -} - -function openaiWorker() { - logger.info("Starting openai worker ..."); - const worker = new Worker<ZOpenAIRequest, void>(OpenAIQueue.name, runOpenAI, { - connection: queueConnectionDetails, - autorun: false, - }); - - worker.on("completed", (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[openai][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[openai][${jobId}] openai job failed: ${error}`); - }); - - return worker; -} +import { CrawlerWorker } from "./crawler"; +import { OpenAiWorker } from "./openai"; async function main() { dotenv.config(); - await Promise.all([crawlerWorker().run(), openaiWorker().run()]); + + const [crawler, openai] = [ + await CrawlerWorker.build(), + await OpenAiWorker.build(), + ]; + + await Promise.all([crawler.run(), openai.run()]); } main(); diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts index a2f90c8a..999f2827 100644 --- a/packages/workers/openai.ts +++ b/packages/workers/openai.ts @@ -1,13 +1,41 @@ -import prisma, { BookmarkedLink } from "@remember/db"; +import { prisma, BookmarkedLink } from "@remember/db"; import logger from "@remember/shared/logger"; -import { ZOpenAIRequest, zOpenAIRequestSchema } from "@remember/shared/queues"; +import { OpenAIQueue, ZOpenAIRequest, queueConnectionDetails, zOpenAIRequestSchema } from "@remember/shared/queues"; import { Job } from "bullmq"; import OpenAI from "openai"; import { z } from "zod"; +import { Worker } from "bullmq"; const openAIResponseSchema = z.object({ tags: z.array(z.string()), }); + + +export class OpenAiWorker { + static async build() { + logger.info("Starting openai worker ..."); + const worker = new Worker<ZOpenAIRequest, void>( + OpenAIQueue.name, + runOpenAI, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[openai][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[openai][${jobId}] openai job failed: ${error}`); + }); + + return worker; + } +} function buildPrompt(url: string, description: string) { return ` @@ -121,7 +149,7 @@ async function connectTags(bookmarkId: string, tagIds: string[]) { ); } -export default async function runOpenAI(job: Job<ZOpenAIRequest, void>) { +async function runOpenAI(job: Job<ZOpenAIRequest, void>) { const jobId = job.id || "unknown"; if (!process.env.OPENAI_API_KEY || !process.env.OPENAI_ENABLED) { diff --git a/packages/workers/package.json b/packages/workers/package.json index 65648f4e..4c012143 100644 --- a/packages/workers/package.json +++ b/packages/workers/package.json @@ -4,6 +4,7 @@ "version": "0.1.0", "private": true, "dependencies": { + "@remember/db": "0.1.0", "@remember/shared": "0.1.0", "dotenv": "^16.4.1", "metascraper": "^5.43.4", @@ -21,10 +22,12 @@ "puppeteer-extra-plugin-stealth": "^2.11.2" }, "devDependencies": { + "@tsconfig/node21": "^21.0.1", "@types/metascraper": "^5.14.3", "ts-node": "^10.9.2" }, "scripts": { - "start": "ts-node index.ts" + "start": "ts-node index.ts", + "typecheck": "tsc --noEmit" } } diff --git a/packages/workers/tsconfig.json b/packages/workers/tsconfig.json new file mode 100644 index 00000000..5ab467a9 --- /dev/null +++ b/packages/workers/tsconfig.json @@ -0,0 +1,6 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@tsconfig/node21/tsconfig.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"] +} |
