From 80bb8a108f29331cdb2f2695f6801beee104dc89 Mon Sep 17 00:00:00 2001 From: MohamedBassem Date: Thu, 8 Feb 2024 15:14:23 +0000 Subject: [refactor] Move the different packages to the package subdir --- packages/workers/crawler.ts | 78 ++++++++++++++++++++ packages/workers/index.ts | 58 +++++++++++++++ packages/workers/openai.ts | 163 ++++++++++++++++++++++++++++++++++++++++++ packages/workers/package.json | 20 ++++++ 4 files changed, 319 insertions(+) create mode 100644 packages/workers/crawler.ts create mode 100644 packages/workers/index.ts create mode 100644 packages/workers/openai.ts create mode 100644 packages/workers/package.json (limited to 'packages/workers') diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts new file mode 100644 index 00000000..817bba45 --- /dev/null +++ b/packages/workers/crawler.ts @@ -0,0 +1,78 @@ +import logger from "@remember/shared/logger"; +import { + OpenAIQueue, + ZCrawlLinkRequest, + zCrawlLinkRequestSchema, +} from "@remember/shared/queues"; +import { Job } from "bullmq"; + +import prisma from "@remember/db"; + +import metascraper from "metascraper"; + +const metascraperParser = metascraper([ + require("metascraper-description")(), + require("metascraper-image")(), + require("metascraper-logo-favicon")(), + require("metascraper-title")(), + require("metascraper-url")(), +]); + +export default async function runCrawler(job: Job) { + const jobId = job.id || "unknown"; + + const request = zCrawlLinkRequestSchema.safeParse(job.data); + if (!request.success) { + logger.error( + `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + return; + } + + const { url, linkId } = request.data; + + logger.info( + `[Crawler][${jobId}] Will crawl "${url}" for link with id "${linkId}"`, + ); + // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc) + + const resp = await fetch(url); + const respBody = await resp.text(); + + const meta = await metascraperParser({ + url, + html: respBody, + }); + + await prisma.bookmarkedLink.update({ + where: { + id: linkId, + }, + data: { + details: { + upsert: { + create: { + title: meta.title, + description: meta.description, + imageUrl: meta.image, + favicon: meta.logo, + }, + update: { + title: meta.title, + description: meta.description, + imageUrl: meta.image, + favicon: meta.logo, + }, + }, + }, + }, + include: { + details: true, + }, + }); + + // Enqueue openai job + OpenAIQueue.add("openai", { + linkId, + }); +} diff --git a/packages/workers/index.ts b/packages/workers/index.ts new file mode 100644 index 00000000..bf092953 --- /dev/null +++ b/packages/workers/index.ts @@ -0,0 +1,58 @@ +import { Worker } from "bullmq"; + +import { + LinkCrawlerQueue, + OpenAIQueue, + ZCrawlLinkRequest, + ZOpenAIRequest, + queueConnectionDetails, +} from "@remember/shared/queues"; +import logger from "@remember/shared/logger"; +import runCrawler from "./crawler"; +import runOpenAI from "./openai"; + +function crawlerWorker() { + logger.info("Starting crawler worker ..."); + const worker = new Worker( + LinkCrawlerQueue.name, + runCrawler, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); + }); + + return worker; +} + +function openaiWorker() { + logger.info("Starting openai worker ..."); + const worker = new Worker(OpenAIQueue.name, runOpenAI, { + connection: queueConnectionDetails, + autorun: false, + }); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[openai][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[openai][${jobId}] openai job failed: ${error}`); + }); + + return worker; +} + +await Promise.all([crawlerWorker().run(), openaiWorker().run()]); diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts new file mode 100644 index 00000000..893aa1af --- /dev/null +++ b/packages/workers/openai.ts @@ -0,0 +1,163 @@ +import prisma, { BookmarkedLink, BookmarkedLinkDetails } from "@remember/db"; +import logger from "@remember/shared/logger"; +import { ZOpenAIRequest, zOpenAIRequestSchema } from "@remember/shared/queues"; +import { Job } from "bullmq"; +import OpenAI from "openai"; +import { z } from "zod"; + +const openAIResponseSchema = z.object({ + tags: z.array(z.string()), +}); + +let openai: OpenAI | undefined; + +if (process.env.OPENAI_API_KEY && process.env.OPENAI_ENABLED) { + openai = new OpenAI({ + apiKey: process.env["OPENAI_API_KEY"], // This is the default and can be omitted + }); +} + +function buildPrompt(url: string, description: string) { + return ` +You are a bot who given an article, extracts relevant "hashtags" out of them. +You must respond in JSON with the key "tags" and the value is list of tags. +---- +URL: ${url} +Description: ${description} + `; +} + +async function fetchLink(linkId: string) { + return await prisma.bookmarkedLink.findUnique({ + where: { + id: linkId, + }, + include: { + details: true, + }, + }); +} + +async function inferTags( + jobId: string, + link: BookmarkedLink, + linkDetails: BookmarkedLinkDetails | null, + openai: OpenAI, +) { + const linkDescription = linkDetails?.description; + if (!linkDescription) { + throw new Error( + `[openai][${jobId}] No description found for link "${link.id}". Skipping ...`, + ); + } + + const chatCompletion = await openai.chat.completions.create({ + messages: [ + { role: "system", content: buildPrompt(link.url, linkDescription) }, + ], + model: "gpt-3.5-turbo-0125", + response_format: { type: "json_object" }, + }); + + let response = chatCompletion.choices[0].message.content; + if (!response) { + throw new Error(`[openai][${jobId}] Got no message content from OpenAI`); + } + + try { + let tags = openAIResponseSchema.parse(JSON.parse(response)).tags; + logger.info( + `[openai][${jobId}] Inferring tag for url "${link.url}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`, + ); + + // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. + tags = tags.map((t) => { + if (t.startsWith("#")) { + return t.slice(1); + } + return t; + }); + + return tags; + } catch (e) { + throw new Error( + `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`, + ); + } +} + +async function createTags(tags: string[], userId: string) { + const existingTags = await prisma.bookmarkTags.findMany({ + select: { + id: true, + name: true, + }, + where: { + userId, + name: { + in: tags, + }, + }, + }); + + const existingTagSet = new Set(existingTags.map((t) => t.name)); + + let newTags = tags.filter((t) => !existingTagSet.has(t)); + + // TODO: Prisma doesn't support createMany in Sqlite + let newTagObjects = await Promise.all( + newTags.map((t) => { + return prisma.bookmarkTags.create({ + data: { + name: t, + userId: userId, + }, + }); + }), + ); + + return existingTags.map((t) => t.id).concat(newTagObjects.map((t) => t.id)); +} + +async function connectTags(linkId: string, tagIds: string[]) { + // TODO: Prisma doesn't support createMany in Sqlite + await Promise.all( + tagIds.map((tagId) => { + return prisma.tagsOnLinks.create({ + data: { + tagId, + linkId, + }, + }); + }), + ); +} + +export default async function runOpenAI(job: Job) { + const jobId = job.id || "unknown"; + + if (!openai) { + logger.debug( + `[openai][${jobId}] OpenAI is not configured, nothing to do now`, + ); + return; + } + + const request = zOpenAIRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const { linkId } = request.data; + const link = await fetchLink(linkId); + if (!link) { + throw new Error(`[openai][${jobId}] link with id ${linkId} was not found`); + } + + const tags = await inferTags(jobId, link, link.details, openai); + + const tagIds = await createTags(tags, link.userId); + await connectTags(linkId, tagIds); +} diff --git a/packages/workers/package.json b/packages/workers/package.json new file mode 100644 index 00000000..e1407912 --- /dev/null +++ b/packages/workers/package.json @@ -0,0 +1,20 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@remember/workers", + "version": "0.1.0", + "private": true, + "dependencies": { + "@remember/shared": "workspace:packages/*", + "metascraper": "^5.43.4", + "metascraper-description": "^5.43.4", + "metascraper-image": "^5.43.4", + "metascraper-logo": "^5.43.4", + "metascraper-title": "^5.43.4", + "metascraper-url": "^5.43.4", + "metascraper-logo-favicon": "^5.43.4", + "openai": "^4.26.1" + }, + "devDependencies": { + "@types/metascraper": "^5.14.3" + } +} -- cgit v1.2.3-70-g09d2