diff options
Diffstat (limited to 'workers')
| -rw-r--r-- | workers/crawler.ts | 78 | ||||
| -rw-r--r-- | workers/index.ts | 58 | ||||
| -rw-r--r-- | workers/openai.ts | 163 | ||||
| -rw-r--r-- | workers/package.json | 20 |
4 files changed, 0 insertions, 319 deletions
diff --git a/workers/crawler.ts b/workers/crawler.ts deleted file mode 100644 index 817bba45..00000000 --- a/workers/crawler.ts +++ /dev/null @@ -1,78 +0,0 @@ -import logger from "@remember/shared/logger"; -import { - OpenAIQueue, - ZCrawlLinkRequest, - zCrawlLinkRequestSchema, -} from "@remember/shared/queues"; -import { Job } from "bullmq"; - -import prisma from "@remember/db"; - -import metascraper from "metascraper"; - -const metascraperParser = metascraper([ - require("metascraper-description")(), - require("metascraper-image")(), - require("metascraper-logo-favicon")(), - require("metascraper-title")(), - require("metascraper-url")(), -]); - -export default async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { - const jobId = job.id || "unknown"; - - const request = zCrawlLinkRequestSchema.safeParse(job.data); - if (!request.success) { - logger.error( - `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - return; - } - - const { url, linkId } = request.data; - - logger.info( - `[Crawler][${jobId}] Will crawl "${url}" for link with id "${linkId}"`, - ); - // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc) - - const resp = await fetch(url); - const respBody = await resp.text(); - - const meta = await metascraperParser({ - url, - html: respBody, - }); - - await prisma.bookmarkedLink.update({ - where: { - id: linkId, - }, - data: { - details: { - upsert: { - create: { - title: meta.title, - description: meta.description, - imageUrl: meta.image, - favicon: meta.logo, - }, - update: { - title: meta.title, - description: meta.description, - imageUrl: meta.image, - favicon: meta.logo, - }, - }, - }, - }, - include: { - details: true, - }, - }); - - // Enqueue openai job - OpenAIQueue.add("openai", { - linkId, - }); -} diff --git a/workers/index.ts b/workers/index.ts deleted file mode 100644 index bf092953..00000000 --- a/workers/index.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { Worker } from "bullmq"; - -import { - LinkCrawlerQueue, - OpenAIQueue, - ZCrawlLinkRequest, - ZOpenAIRequest, - queueConnectionDetails, -} from "@remember/shared/queues"; -import logger from "@remember/shared/logger"; -import runCrawler from "./crawler"; -import runOpenAI from "./openai"; - -function crawlerWorker() { - logger.info("Starting crawler worker ..."); - const worker = new Worker<ZCrawlLinkRequest, void>( - LinkCrawlerQueue.name, - runCrawler, - { - connection: queueConnectionDetails, - autorun: false, - }, - ); - - worker.on("completed", (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[Crawler][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); - }); - - return worker; -} - -function openaiWorker() { - logger.info("Starting openai worker ..."); - const worker = new Worker<ZOpenAIRequest, void>(OpenAIQueue.name, runOpenAI, { - connection: queueConnectionDetails, - autorun: false, - }); - - worker.on("completed", (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[openai][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[openai][${jobId}] openai job failed: ${error}`); - }); - - return worker; -} - -await Promise.all([crawlerWorker().run(), openaiWorker().run()]); diff --git a/workers/openai.ts b/workers/openai.ts deleted file mode 100644 index 893aa1af..00000000 --- a/workers/openai.ts +++ /dev/null @@ -1,163 +0,0 @@ -import prisma, { BookmarkedLink, BookmarkedLinkDetails } from "@remember/db"; -import logger from "@remember/shared/logger"; -import { ZOpenAIRequest, zOpenAIRequestSchema } from "@remember/shared/queues"; -import { Job } from "bullmq"; -import OpenAI from "openai"; -import { z } from "zod"; - -const openAIResponseSchema = z.object({ - tags: z.array(z.string()), -}); - -let openai: OpenAI | undefined; - -if (process.env.OPENAI_API_KEY && process.env.OPENAI_ENABLED) { - openai = new OpenAI({ - apiKey: process.env["OPENAI_API_KEY"], // This is the default and can be omitted - }); -} - -function buildPrompt(url: string, description: string) { - return ` -You are a bot who given an article, extracts relevant "hashtags" out of them. -You must respond in JSON with the key "tags" and the value is list of tags. ----- -URL: ${url} -Description: ${description} - `; -} - -async function fetchLink(linkId: string) { - return await prisma.bookmarkedLink.findUnique({ - where: { - id: linkId, - }, - include: { - details: true, - }, - }); -} - -async function inferTags( - jobId: string, - link: BookmarkedLink, - linkDetails: BookmarkedLinkDetails | null, - openai: OpenAI, -) { - const linkDescription = linkDetails?.description; - if (!linkDescription) { - throw new Error( - `[openai][${jobId}] No description found for link "${link.id}". Skipping ...`, - ); - } - - const chatCompletion = await openai.chat.completions.create({ - messages: [ - { role: "system", content: buildPrompt(link.url, linkDescription) }, - ], - model: "gpt-3.5-turbo-0125", - response_format: { type: "json_object" }, - }); - - let response = chatCompletion.choices[0].message.content; - if (!response) { - throw new Error(`[openai][${jobId}] Got no message content from OpenAI`); - } - - try { - let tags = openAIResponseSchema.parse(JSON.parse(response)).tags; - logger.info( - `[openai][${jobId}] Inferring tag for url "${link.url}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`, - ); - - // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. - tags = tags.map((t) => { - if (t.startsWith("#")) { - return t.slice(1); - } - return t; - }); - - return tags; - } catch (e) { - throw new Error( - `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`, - ); - } -} - -async function createTags(tags: string[], userId: string) { - const existingTags = await prisma.bookmarkTags.findMany({ - select: { - id: true, - name: true, - }, - where: { - userId, - name: { - in: tags, - }, - }, - }); - - const existingTagSet = new Set<string>(existingTags.map((t) => t.name)); - - let newTags = tags.filter((t) => !existingTagSet.has(t)); - - // TODO: Prisma doesn't support createMany in Sqlite - let newTagObjects = await Promise.all( - newTags.map((t) => { - return prisma.bookmarkTags.create({ - data: { - name: t, - userId: userId, - }, - }); - }), - ); - - return existingTags.map((t) => t.id).concat(newTagObjects.map((t) => t.id)); -} - -async function connectTags(linkId: string, tagIds: string[]) { - // TODO: Prisma doesn't support createMany in Sqlite - await Promise.all( - tagIds.map((tagId) => { - return prisma.tagsOnLinks.create({ - data: { - tagId, - linkId, - }, - }); - }), - ); -} - -export default async function runOpenAI(job: Job<ZOpenAIRequest, void>) { - const jobId = job.id || "unknown"; - - if (!openai) { - logger.debug( - `[openai][${jobId}] OpenAI is not configured, nothing to do now`, - ); - return; - } - - const request = zOpenAIRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const { linkId } = request.data; - const link = await fetchLink(linkId); - if (!link) { - throw new Error(`[openai][${jobId}] link with id ${linkId} was not found`); - } - - const tags = await inferTags(jobId, link, link.details, openai); - - const tagIds = await createTags(tags, link.userId); - await connectTags(linkId, tagIds); -} diff --git a/workers/package.json b/workers/package.json deleted file mode 100644 index 1d32f499..00000000 --- a/workers/package.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@remember/workers", - "version": "0.1.0", - "private": true, - "dependencies": { - "@remember/shared": "workspace:*", - "metascraper": "^5.43.4", - "metascraper-description": "^5.43.4", - "metascraper-image": "^5.43.4", - "metascraper-logo": "^5.43.4", - "metascraper-title": "^5.43.4", - "metascraper-url": "^5.43.4", - "metascraper-logo-favicon": "^5.43.4", - "openai": "^4.26.1" - }, - "devDependencies": { - "@types/metascraper": "^5.14.3" - } -} |
