aboutsummaryrefslogtreecommitdiffstats
path: root/packages/workers
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2024-02-08 15:14:23 +0000
committerMohamedBassem <me@mbassem.com>2024-02-08 15:15:21 +0000
commit80bb8a108f29331cdb2f2695f6801beee104dc89 (patch)
treeb1ae2a512963a9c916c4bfed71f7633f508de131 /packages/workers
parent333429adbaaa592cc96b480a5228f0e3f1de4cc2 (diff)
downloadkarakeep-80bb8a108f29331cdb2f2695f6801beee104dc89.tar.zst
[refactor] Move the different packages to the package subdir
Diffstat (limited to 'packages/workers')
-rw-r--r--packages/workers/crawler.ts78
-rw-r--r--packages/workers/index.ts58
-rw-r--r--packages/workers/openai.ts163
-rw-r--r--packages/workers/package.json20
4 files changed, 319 insertions, 0 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts
new file mode 100644
index 00000000..817bba45
--- /dev/null
+++ b/packages/workers/crawler.ts
@@ -0,0 +1,78 @@
+import logger from "@remember/shared/logger";
+import {
+ OpenAIQueue,
+ ZCrawlLinkRequest,
+ zCrawlLinkRequestSchema,
+} from "@remember/shared/queues";
+import { Job } from "bullmq";
+
+import prisma from "@remember/db";
+
+import metascraper from "metascraper";
+
+const metascraperParser = metascraper([
+ require("metascraper-description")(),
+ require("metascraper-image")(),
+ require("metascraper-logo-favicon")(),
+ require("metascraper-title")(),
+ require("metascraper-url")(),
+]);
+
+export default async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
+ const jobId = job.id || "unknown";
+
+ const request = zCrawlLinkRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ logger.error(
+ `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ return;
+ }
+
+ const { url, linkId } = request.data;
+
+ logger.info(
+ `[Crawler][${jobId}] Will crawl "${url}" for link with id "${linkId}"`,
+ );
+ // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc)
+
+ const resp = await fetch(url);
+ const respBody = await resp.text();
+
+ const meta = await metascraperParser({
+ url,
+ html: respBody,
+ });
+
+ await prisma.bookmarkedLink.update({
+ where: {
+ id: linkId,
+ },
+ data: {
+ details: {
+ upsert: {
+ create: {
+ title: meta.title,
+ description: meta.description,
+ imageUrl: meta.image,
+ favicon: meta.logo,
+ },
+ update: {
+ title: meta.title,
+ description: meta.description,
+ imageUrl: meta.image,
+ favicon: meta.logo,
+ },
+ },
+ },
+ },
+ include: {
+ details: true,
+ },
+ });
+
+ // Enqueue openai job
+ OpenAIQueue.add("openai", {
+ linkId,
+ });
+}
diff --git a/packages/workers/index.ts b/packages/workers/index.ts
new file mode 100644
index 00000000..bf092953
--- /dev/null
+++ b/packages/workers/index.ts
@@ -0,0 +1,58 @@
+import { Worker } from "bullmq";
+
+import {
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ ZCrawlLinkRequest,
+ ZOpenAIRequest,
+ queueConnectionDetails,
+} from "@remember/shared/queues";
+import logger from "@remember/shared/logger";
+import runCrawler from "./crawler";
+import runOpenAI from "./openai";
+
+function crawlerWorker() {
+ logger.info("Starting crawler worker ...");
+ const worker = new Worker<ZCrawlLinkRequest, void>(
+ LinkCrawlerQueue.name,
+ runCrawler,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[Crawler][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
+ });
+
+ return worker;
+}
+
+function openaiWorker() {
+ logger.info("Starting openai worker ...");
+ const worker = new Worker<ZOpenAIRequest, void>(OpenAIQueue.name, runOpenAI, {
+ connection: queueConnectionDetails,
+ autorun: false,
+ });
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[openai][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[openai][${jobId}] openai job failed: ${error}`);
+ });
+
+ return worker;
+}
+
+await Promise.all([crawlerWorker().run(), openaiWorker().run()]);
diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts
new file mode 100644
index 00000000..893aa1af
--- /dev/null
+++ b/packages/workers/openai.ts
@@ -0,0 +1,163 @@
+import prisma, { BookmarkedLink, BookmarkedLinkDetails } from "@remember/db";
+import logger from "@remember/shared/logger";
+import { ZOpenAIRequest, zOpenAIRequestSchema } from "@remember/shared/queues";
+import { Job } from "bullmq";
+import OpenAI from "openai";
+import { z } from "zod";
+
+const openAIResponseSchema = z.object({
+ tags: z.array(z.string()),
+});
+
+let openai: OpenAI | undefined;
+
+if (process.env.OPENAI_API_KEY && process.env.OPENAI_ENABLED) {
+ openai = new OpenAI({
+ apiKey: process.env["OPENAI_API_KEY"], // This is the default and can be omitted
+ });
+}
+
+function buildPrompt(url: string, description: string) {
+ return `
+You are a bot who given an article, extracts relevant "hashtags" out of them.
+You must respond in JSON with the key "tags" and the value is list of tags.
+----
+URL: ${url}
+Description: ${description}
+ `;
+}
+
+async function fetchLink(linkId: string) {
+ return await prisma.bookmarkedLink.findUnique({
+ where: {
+ id: linkId,
+ },
+ include: {
+ details: true,
+ },
+ });
+}
+
+async function inferTags(
+ jobId: string,
+ link: BookmarkedLink,
+ linkDetails: BookmarkedLinkDetails | null,
+ openai: OpenAI,
+) {
+ const linkDescription = linkDetails?.description;
+ if (!linkDescription) {
+ throw new Error(
+ `[openai][${jobId}] No description found for link "${link.id}". Skipping ...`,
+ );
+ }
+
+ const chatCompletion = await openai.chat.completions.create({
+ messages: [
+ { role: "system", content: buildPrompt(link.url, linkDescription) },
+ ],
+ model: "gpt-3.5-turbo-0125",
+ response_format: { type: "json_object" },
+ });
+
+ let response = chatCompletion.choices[0].message.content;
+ if (!response) {
+ throw new Error(`[openai][${jobId}] Got no message content from OpenAI`);
+ }
+
+ try {
+ let tags = openAIResponseSchema.parse(JSON.parse(response)).tags;
+ logger.info(
+ `[openai][${jobId}] Inferring tag for url "${link.url}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`,
+ );
+
+ // Sometimes the tags contain the hashtag symbol, let's strip them out if they do.
+ tags = tags.map((t) => {
+ if (t.startsWith("#")) {
+ return t.slice(1);
+ }
+ return t;
+ });
+
+ return tags;
+ } catch (e) {
+ throw new Error(
+ `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`,
+ );
+ }
+}
+
+async function createTags(tags: string[], userId: string) {
+ const existingTags = await prisma.bookmarkTags.findMany({
+ select: {
+ id: true,
+ name: true,
+ },
+ where: {
+ userId,
+ name: {
+ in: tags,
+ },
+ },
+ });
+
+ const existingTagSet = new Set<string>(existingTags.map((t) => t.name));
+
+ let newTags = tags.filter((t) => !existingTagSet.has(t));
+
+ // TODO: Prisma doesn't support createMany in Sqlite
+ let newTagObjects = await Promise.all(
+ newTags.map((t) => {
+ return prisma.bookmarkTags.create({
+ data: {
+ name: t,
+ userId: userId,
+ },
+ });
+ }),
+ );
+
+ return existingTags.map((t) => t.id).concat(newTagObjects.map((t) => t.id));
+}
+
+async function connectTags(linkId: string, tagIds: string[]) {
+ // TODO: Prisma doesn't support createMany in Sqlite
+ await Promise.all(
+ tagIds.map((tagId) => {
+ return prisma.tagsOnLinks.create({
+ data: {
+ tagId,
+ linkId,
+ },
+ });
+ }),
+ );
+}
+
+export default async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
+ const jobId = job.id || "unknown";
+
+ if (!openai) {
+ logger.debug(
+ `[openai][${jobId}] OpenAI is not configured, nothing to do now`,
+ );
+ return;
+ }
+
+ const request = zOpenAIRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ throw new Error(
+ `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ }
+
+ const { linkId } = request.data;
+ const link = await fetchLink(linkId);
+ if (!link) {
+ throw new Error(`[openai][${jobId}] link with id ${linkId} was not found`);
+ }
+
+ const tags = await inferTags(jobId, link, link.details, openai);
+
+ const tagIds = await createTags(tags, link.userId);
+ await connectTags(linkId, tagIds);
+}
diff --git a/packages/workers/package.json b/packages/workers/package.json
new file mode 100644
index 00000000..e1407912
--- /dev/null
+++ b/packages/workers/package.json
@@ -0,0 +1,20 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@remember/workers",
+ "version": "0.1.0",
+ "private": true,
+ "dependencies": {
+ "@remember/shared": "workspace:packages/*",
+ "metascraper": "^5.43.4",
+ "metascraper-description": "^5.43.4",
+ "metascraper-image": "^5.43.4",
+ "metascraper-logo": "^5.43.4",
+ "metascraper-title": "^5.43.4",
+ "metascraper-url": "^5.43.4",
+ "metascraper-logo-favicon": "^5.43.4",
+ "openai": "^4.26.1"
+ },
+ "devDependencies": {
+ "@types/metascraper": "^5.14.3"
+ }
+}