aboutsummaryrefslogtreecommitdiffstats
path: root/packages/workers
diff options
context:
space:
mode:
Diffstat (limited to 'packages/workers')
-rw-r--r--packages/workers/crawler.ts50
-rw-r--r--packages/workers/index.ts67
-rw-r--r--packages/workers/openai.ts34
-rw-r--r--packages/workers/package.json5
-rw-r--r--packages/workers/tsconfig.json6
5 files changed, 92 insertions, 70 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts
index 1cb82f31..45d2f530 100644
--- a/packages/workers/crawler.ts
+++ b/packages/workers/crawler.ts
@@ -1,12 +1,16 @@
import logger from "@remember/shared/logger";
import {
+ LinkCrawlerQueue,
OpenAIQueue,
ZCrawlLinkRequest,
+ queueConnectionDetails,
zCrawlLinkRequestSchema,
} from "@remember/shared/queues";
+
+import { Worker } from "bullmq";
import { Job } from "bullmq";
-import prisma from "@remember/db";
+import { prisma } from "@remember/db";
import { Browser } from "puppeteer";
import puppeteer from "puppeteer-extra";
@@ -32,14 +36,44 @@ const metascraperParser = metascraper([
metascraperUrl(),
]);
-let browser: Browser;
-(async () => {
- puppeteer.use(StealthPlugin());
- // TODO: Configure the browser mode via an env variable
- browser = await puppeteer.launch({ headless: true });
-})();
+let browser: Browser | undefined;
+
+export class CrawlerWorker {
+ static async build() {
+ if (!browser) {
+ puppeteer.use(StealthPlugin());
+ console.log("HERE");
+ browser = await puppeteer.launch({ headless: true });
+ }
+
+ logger.info("Starting crawler worker ...");
+ const worker = new Worker<ZCrawlLinkRequest, void>(
+ LinkCrawlerQueue.name,
+ runCrawler,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[Crawler][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
+ });
+
+ return worker;
+ }
+}
async function crawlPage(url: string) {
+ if (!browser) {
+ throw new Error("The browser must have been initalized by this point.");
+ }
const context = await browser.createBrowserContext();
const page = await context.newPage();
@@ -53,7 +87,7 @@ async function crawlPage(url: string) {
return htmlContent;
}
-export default async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
+async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
const jobId = job.id || "unknown";
const request = zCrawlLinkRequestSchema.safeParse(job.data);
diff --git a/packages/workers/index.ts b/packages/workers/index.ts
index d16c42eb..a58b2edf 100644
--- a/packages/workers/index.ts
+++ b/packages/workers/index.ts
@@ -1,65 +1,16 @@
-import { Worker } from "bullmq";
-
import dotenv from "dotenv";
-
-import {
- LinkCrawlerQueue,
- OpenAIQueue,
- ZCrawlLinkRequest,
- ZOpenAIRequest,
- queueConnectionDetails,
-} from "@remember/shared/queues";
-import logger from "@remember/shared/logger";
-import runCrawler from "./crawler";
-import runOpenAI from "./openai";
-
-function crawlerWorker() {
- logger.info("Starting crawler worker ...");
- const worker = new Worker<ZCrawlLinkRequest, void>(
- LinkCrawlerQueue.name,
- runCrawler,
- {
- connection: queueConnectionDetails,
- autorun: false,
- },
- );
-
- worker.on("completed", (job) => {
- const jobId = job?.id || "unknown";
- logger.info(`[Crawler][${jobId}] Completed successfully`);
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id || "unknown";
- logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
- });
-
- return worker;
-}
-
-function openaiWorker() {
- logger.info("Starting openai worker ...");
- const worker = new Worker<ZOpenAIRequest, void>(OpenAIQueue.name, runOpenAI, {
- connection: queueConnectionDetails,
- autorun: false,
- });
-
- worker.on("completed", (job) => {
- const jobId = job?.id || "unknown";
- logger.info(`[openai][${jobId}] Completed successfully`);
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id || "unknown";
- logger.error(`[openai][${jobId}] openai job failed: ${error}`);
- });
-
- return worker;
-}
+import { CrawlerWorker } from "./crawler";
+import { OpenAiWorker } from "./openai";
async function main() {
dotenv.config();
- await Promise.all([crawlerWorker().run(), openaiWorker().run()]);
+
+ const [crawler, openai] = [
+ await CrawlerWorker.build(),
+ await OpenAiWorker.build(),
+ ];
+
+ await Promise.all([crawler.run(), openai.run()]);
}
main();
diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts
index a2f90c8a..999f2827 100644
--- a/packages/workers/openai.ts
+++ b/packages/workers/openai.ts
@@ -1,13 +1,41 @@
-import prisma, { BookmarkedLink } from "@remember/db";
+import { prisma, BookmarkedLink } from "@remember/db";
import logger from "@remember/shared/logger";
-import { ZOpenAIRequest, zOpenAIRequestSchema } from "@remember/shared/queues";
+import { OpenAIQueue, ZOpenAIRequest, queueConnectionDetails, zOpenAIRequestSchema } from "@remember/shared/queues";
import { Job } from "bullmq";
import OpenAI from "openai";
import { z } from "zod";
+import { Worker } from "bullmq";
const openAIResponseSchema = z.object({
tags: z.array(z.string()),
});
+
+
+export class OpenAiWorker {
+ static async build() {
+ logger.info("Starting openai worker ...");
+ const worker = new Worker<ZOpenAIRequest, void>(
+ OpenAIQueue.name,
+ runOpenAI,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[openai][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[openai][${jobId}] openai job failed: ${error}`);
+ });
+
+ return worker;
+ }
+}
function buildPrompt(url: string, description: string) {
return `
@@ -121,7 +149,7 @@ async function connectTags(bookmarkId: string, tagIds: string[]) {
);
}
-export default async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
+async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
const jobId = job.id || "unknown";
if (!process.env.OPENAI_API_KEY || !process.env.OPENAI_ENABLED) {
diff --git a/packages/workers/package.json b/packages/workers/package.json
index 65648f4e..4c012143 100644
--- a/packages/workers/package.json
+++ b/packages/workers/package.json
@@ -4,6 +4,7 @@
"version": "0.1.0",
"private": true,
"dependencies": {
+ "@remember/db": "0.1.0",
"@remember/shared": "0.1.0",
"dotenv": "^16.4.1",
"metascraper": "^5.43.4",
@@ -21,10 +22,12 @@
"puppeteer-extra-plugin-stealth": "^2.11.2"
},
"devDependencies": {
+ "@tsconfig/node21": "^21.0.1",
"@types/metascraper": "^5.14.3",
"ts-node": "^10.9.2"
},
"scripts": {
- "start": "ts-node index.ts"
+ "start": "ts-node index.ts",
+ "typecheck": "tsc --noEmit"
}
}
diff --git a/packages/workers/tsconfig.json b/packages/workers/tsconfig.json
new file mode 100644
index 00000000..5ab467a9
--- /dev/null
+++ b/packages/workers/tsconfig.json
@@ -0,0 +1,6 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@tsconfig/node21/tsconfig.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"]
+}