aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2024-03-13 21:43:44 +0000
committerMohamed Bassem <me@mbassem.com>2024-03-14 16:40:45 +0000
commit04572a8e5081b1e4871e273cde9dbaaa44c52fe0 (patch)
tree8e993acb732a50d1306d4d6953df96c165c57f57 /apps/workers
parent2df08ed08c065e8b91bc8df0266bd4bcbb062be4 (diff)
downloadkarakeep-04572a8e5081b1e4871e273cde9dbaaa44c52fe0.tar.zst
structure: Create apps dir and copy tooling dir from t3-turbo repo
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/crawlerWorker.ts201
-rw-r--r--apps/workers/index.ts16
-rw-r--r--apps/workers/openaiWorker.ts263
-rw-r--r--apps/workers/package.json56
-rw-r--r--apps/workers/searchWorker.ts116
-rw-r--r--apps/workers/tsconfig.json10
6 files changed, 662 insertions, 0 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
new file mode 100644
index 00000000..5db2da7b
--- /dev/null
+++ b/apps/workers/crawlerWorker.ts
@@ -0,0 +1,201 @@
+import logger from "@hoarder/shared/logger";
+import {
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ SearchIndexingQueue,
+ ZCrawlLinkRequest,
+ queueConnectionDetails,
+ zCrawlLinkRequestSchema,
+} from "@hoarder/shared/queues";
+import DOMPurify from "dompurify";
+import { JSDOM } from "jsdom";
+
+import { Worker } from "bullmq";
+import { Job } from "bullmq";
+
+import { db } from "@hoarder/db";
+
+import { Browser } from "puppeteer";
+import puppeteer from "puppeteer-extra";
+import StealthPlugin from "puppeteer-extra-plugin-stealth";
+import AdblockerPlugin from "puppeteer-extra-plugin-adblocker";
+
+import metascraper from "metascraper";
+
+import metascraperDescription from "metascraper-description";
+import metascraperImage from "metascraper-image";
+import metascraperLogo from "metascraper-logo-favicon";
+import metascraperTitle from "metascraper-title";
+import metascraperUrl from "metascraper-url";
+import metascraperTwitter from "metascraper-twitter";
+import metascraperReadability from "metascraper-readability";
+import { Mutex } from "async-mutex";
+import assert from "assert";
+import serverConfig from "@hoarder/shared/config";
+import { bookmarkLinks } from "@hoarder/db/schema";
+import { eq } from "drizzle-orm";
+import { Readability } from "@mozilla/readability";
+
+const metascraperParser = metascraper([
+ metascraperReadability(),
+ metascraperTitle(),
+ metascraperDescription(),
+ metascraperTwitter(),
+ metascraperImage(),
+ metascraperLogo(),
+ metascraperUrl(),
+]);
+
+let browser: Browser | undefined;
+// Guards the interactions with the browser instance.
+// This is needed given that most of the browser APIs are async.
+const browserMutex = new Mutex();
+
+async function launchBrowser() {
+ browser = undefined;
+ await browserMutex.runExclusive(async () => {
+ browser = await puppeteer.launch({
+ headless: serverConfig.crawler.headlessBrowser,
+ executablePath: serverConfig.crawler.browserExecutablePath,
+ userDataDir: serverConfig.crawler.browserUserDataDir,
+ });
+ browser.on("disconnected", async () => {
+ logger.info(
+ "The puppeteer browser got disconnected. Will attempt to launch it again.",
+ );
+ await launchBrowser();
+ });
+ });
+}
+
+export class CrawlerWorker {
+ static async build() {
+ puppeteer.use(StealthPlugin());
+ puppeteer.use(
+ AdblockerPlugin({
+ blockTrackersAndAnnoyances: true,
+ }),
+ );
+ await launchBrowser();
+
+ logger.info("Starting crawler worker ...");
+ const worker = new Worker<ZCrawlLinkRequest, void>(
+ LinkCrawlerQueue.name,
+ runCrawler,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[Crawler][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
+ });
+
+ return worker;
+ }
+}
+
+async function getBookmarkUrl(bookmarkId: string) {
+ const bookmark = await db.query.bookmarkLinks.findFirst({
+ where: eq(bookmarkLinks.id, bookmarkId),
+ });
+
+ if (!bookmark) {
+ throw new Error("The bookmark either doesn't exist or not a link");
+ }
+ return bookmark.url;
+}
+
+async function crawlPage(url: string) {
+ assert(browser);
+ const context = await browser.createBrowserContext();
+
+ try {
+ const page = await context.newPage();
+
+ await page.goto(url, {
+ timeout: 10000, // 10 seconds
+ });
+
+ // Wait until there's at most two connections for 2 seconds
+ // Attempt to wait only for 5 seconds
+ await Promise.race([
+ page.waitForNetworkIdle({
+ idleTime: 1000, // 1 sec
+ concurrency: 2,
+ }),
+ new Promise((f) => setTimeout(f, 5000)),
+ ]);
+
+ const htmlContent = await page.content();
+ return htmlContent;
+ } finally {
+ await context.close();
+ }
+}
+
+async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
+ const jobId = job.id || "unknown";
+
+ const request = zCrawlLinkRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ logger.error(
+ `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ return;
+ }
+
+ const { bookmarkId } = request.data;
+ const url = await getBookmarkUrl(bookmarkId);
+
+ logger.info(
+ `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`,
+ );
+ // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc)
+
+ const htmlContent = await crawlPage(url);
+
+ const meta = await metascraperParser({
+ url,
+ html: htmlContent,
+ });
+
+ const window = new JSDOM("").window;
+ const purify = DOMPurify(window);
+ const purifiedHTML = purify.sanitize(htmlContent);
+ const purifiedDOM = new JSDOM(purifiedHTML, { url });
+ const readableContent = new Readability(purifiedDOM.window.document).parse();
+
+ // TODO(important): Restrict the size of content to store
+
+ await db
+ .update(bookmarkLinks)
+ .set({
+ title: meta.title,
+ description: meta.description,
+ imageUrl: meta.image,
+ favicon: meta.logo,
+ content: readableContent?.textContent,
+ htmlContent: readableContent?.content,
+ crawledAt: new Date(),
+ })
+ .where(eq(bookmarkLinks.id, bookmarkId));
+
+ // Enqueue openai job
+ OpenAIQueue.add("openai", {
+ bookmarkId,
+ });
+
+ // Update the search index
+ SearchIndexingQueue.add("search_indexing", {
+ bookmarkId,
+ type: "index",
+ });
+}
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
new file mode 100644
index 00000000..5b6b62d5
--- /dev/null
+++ b/apps/workers/index.ts
@@ -0,0 +1,16 @@
+import "dotenv/config";
+import { CrawlerWorker } from "./crawlerWorker";
+import { OpenAiWorker } from "./openaiWorker";
+import { SearchIndexingWorker } from "./searchWorker";
+
+async function main() {
+ const [crawler, openai, search] = [
+ await CrawlerWorker.build(),
+ await OpenAiWorker.build(),
+ await SearchIndexingWorker.build(),
+ ];
+
+ await Promise.all([crawler.run(), openai.run(), search.run()]);
+}
+
+main();
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
new file mode 100644
index 00000000..1ec22d32
--- /dev/null
+++ b/apps/workers/openaiWorker.ts
@@ -0,0 +1,263 @@
+import { db } from "@hoarder/db";
+import logger from "@hoarder/shared/logger";
+import serverConfig from "@hoarder/shared/config";
+import {
+ OpenAIQueue,
+ SearchIndexingQueue,
+ ZOpenAIRequest,
+ queueConnectionDetails,
+ zOpenAIRequestSchema,
+} from "@hoarder/shared/queues";
+import { Job } from "bullmq";
+import OpenAI from "openai";
+import { z } from "zod";
+import { Worker } from "bullmq";
+import { bookmarkTags, bookmarks, tagsOnBookmarks } from "@hoarder/db/schema";
+import { and, eq, inArray } from "drizzle-orm";
+
+const openAIResponseSchema = z.object({
+ tags: z.array(z.string()),
+});
+
+async function attemptMarkTaggingStatus(
+ jobData: object | undefined,
+ status: "success" | "failure",
+) {
+ if (!jobData) {
+ return;
+ }
+ try {
+ const request = zOpenAIRequestSchema.parse(jobData);
+ await db
+ .update(bookmarks)
+ .set({
+ taggingStatus: status,
+ })
+ .where(eq(bookmarks.id, request.bookmarkId));
+ } catch (e) {
+ console.log(`Something went wrong when marking the tagging status: ${e}`);
+ }
+}
+
+export class OpenAiWorker {
+ static async build() {
+ logger.info("Starting openai worker ...");
+ const worker = new Worker<ZOpenAIRequest, void>(
+ OpenAIQueue.name,
+ runOpenAI,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", async (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[openai][${jobId}] Completed successfully`);
+ await attemptMarkTaggingStatus(job?.data, "success");
+ });
+
+ worker.on("failed", async (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[openai][${jobId}] openai job failed: ${error}`);
+ await attemptMarkTaggingStatus(job?.data, "failure");
+ });
+
+ return worker;
+ }
+}
+
+const PROMPT_BASE = `
+I'm building a read-it-later app and I need your help with automatic tagging.
+Please analyze the text after the sentence "CONTENT START HERE:" and suggest relevant tags that describe its key themes, topics, and main ideas.
+Aim for a variety of tags, including broad categories, specific keywords, and potential sub-genres. If it's a famous website
+you may also include a tag for the website. Tags should be lowercases and don't contain spaces. If the tag is not generic enough, don't
+include it. Aim for 3-5 tags. If there are no good tags, don't emit any. The content can include text for cookie consent and privacy policy, ignore those while tagging.
+You must respond in JSON with the key "tags" and the value is list of tags.
+CONTENT START HERE:
+`;
+
+function buildPrompt(
+ bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
+) {
+ if (bookmark.link) {
+ if (!bookmark.link.description && !bookmark.link.content) {
+ throw new Error(
+ `No content found for link "${bookmark.id}". Skipping ...`,
+ );
+ }
+
+ let content = bookmark.link.content;
+ if (content) {
+ let words = content.split(" ");
+ if (words.length > 2000) {
+ words = words.slice(2000);
+ content = words.join(" ");
+ }
+ }
+ return `
+${PROMPT_BASE}
+URL: ${bookmark.link.url}
+Title: ${bookmark.link.title || ""}
+Description: ${bookmark.link.description || ""}
+Content: ${content || ""}
+ `;
+ }
+
+ if (bookmark.text) {
+ // TODO: Ensure that the content doesn't exceed the context length of openai
+ return `
+${PROMPT_BASE}
+${bookmark.text.text}
+ `;
+ }
+
+ throw new Error("Unknown bookmark type");
+}
+
+async function fetchBookmark(linkId: string) {
+ return await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, linkId),
+ with: {
+ link: true,
+ text: true,
+ },
+ });
+}
+
+async function inferTags(
+ jobId: string,
+ bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
+ openai: OpenAI,
+) {
+ const chatCompletion = await openai.chat.completions.create({
+ messages: [{ role: "system", content: buildPrompt(bookmark) }],
+ model: "gpt-3.5-turbo-0125",
+ response_format: { type: "json_object" },
+ });
+
+ const response = chatCompletion.choices[0].message.content;
+ if (!response) {
+ throw new Error(`[openai][${jobId}] Got no message content from OpenAI`);
+ }
+
+ try {
+ let tags = openAIResponseSchema.parse(JSON.parse(response)).tags;
+ logger.info(
+ `[openai][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`,
+ );
+
+ // Sometimes the tags contain the hashtag symbol, let's strip them out if they do.
+ tags = tags.map((t) => {
+ if (t.startsWith("#")) {
+ return t.slice(1);
+ }
+ return t;
+ });
+
+ return tags;
+ } catch (e) {
+ throw new Error(
+ `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`,
+ );
+ }
+}
+
+async function connectTags(
+ bookmarkId: string,
+ newTags: string[],
+ userId: string,
+) {
+ if (newTags.length == 0) {
+ return;
+ }
+
+ await db.transaction(async (tx) => {
+ // Create tags that didn't exist previously
+ await tx
+ .insert(bookmarkTags)
+ .values(
+ newTags.map((t) => ({
+ name: t,
+ userId,
+ })),
+ )
+ .onConflictDoNothing();
+
+ const newTagIds = (
+ await tx.query.bookmarkTags.findMany({
+ where: and(
+ eq(bookmarkTags.userId, userId),
+ inArray(bookmarkTags.name, newTags),
+ ),
+ columns: {
+ id: true,
+ },
+ })
+ ).map((r) => r.id);
+
+ // Delete old AI tags
+ await tx
+ .delete(tagsOnBookmarks)
+ .where(
+ and(
+ eq(tagsOnBookmarks.attachedBy, "ai"),
+ eq(tagsOnBookmarks.bookmarkId, bookmarkId),
+ ),
+ );
+
+ // Attach new ones
+ await tx
+ .insert(tagsOnBookmarks)
+ .values(
+ newTagIds.map((tagId) => ({
+ tagId,
+ bookmarkId,
+ attachedBy: "ai" as const,
+ })),
+ )
+ .onConflictDoNothing();
+ });
+}
+
+async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
+ const jobId = job.id || "unknown";
+
+ const { openAI } = serverConfig;
+
+ if (!openAI.apiKey) {
+ logger.debug(
+ `[openai][${jobId}] OpenAI is not configured, nothing to do now`,
+ );
+ return;
+ }
+
+ const openai = new OpenAI({
+ apiKey: openAI.apiKey,
+ });
+
+ const request = zOpenAIRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ throw new Error(
+ `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ }
+
+ const { bookmarkId } = request.data;
+ const bookmark = await fetchBookmark(bookmarkId);
+ if (!bookmark) {
+ throw new Error(
+ `[openai][${jobId}] bookmark with id ${bookmarkId} was not found`,
+ );
+ }
+
+ const tags = await inferTags(jobId, bookmark, openai);
+
+ await connectTags(bookmarkId, tags, bookmark.userId);
+
+ // Update the search index
+ SearchIndexingQueue.add("search_indexing", {
+ bookmarkId,
+ type: "index",
+ });
+}
diff --git a/apps/workers/package.json b/apps/workers/package.json
new file mode 100644
index 00000000..8446a54d
--- /dev/null
+++ b/apps/workers/package.json
@@ -0,0 +1,56 @@
+{
+ "$schema": "https://json.schemastore.org/package.json",
+ "name": "@hoarder/workers",
+ "version": "0.1.0",
+ "private": true,
+ "type": "module",
+ "dependencies": {
+ "@hoarder/db": "workspace:*",
+ "@hoarder/shared": "workspace:*",
+ "@mozilla/readability": "^0.5.0",
+ "@tsconfig/node21": "^21.0.1",
+ "async-mutex": "^0.4.1",
+ "bullmq": "^5.1.9",
+ "dompurify": "^3.0.9",
+ "dotenv": "^16.4.1",
+ "drizzle-orm": "^0.29.4",
+ "jsdom": "^24.0.0",
+ "metascraper": "^5.43.4",
+ "metascraper-description": "^5.43.4",
+ "metascraper-image": "^5.43.4",
+ "metascraper-logo": "^5.43.4",
+ "metascraper-logo-favicon": "^5.43.4",
+ "metascraper-readability": "^5.43.4",
+ "metascraper-title": "^5.43.4",
+ "metascraper-twitter": "^5.43.4",
+ "metascraper-url": "^5.43.4",
+ "openai": "^4.29.0",
+ "puppeteer": "^22.0.0",
+ "puppeteer-extra": "^3.3.6",
+ "puppeteer-extra-plugin-adblocker": "^2.13.6",
+ "puppeteer-extra-plugin-stealth": "^2.11.2",
+ "tsx": "^4.7.1",
+ "typescript": "^5.3.3",
+ "zod": "^3.22.4"
+ },
+ "devDependencies": {
+ "@hoarder/eslint-config": "workspace:^0.2.0",
+ "@hoarder/prettier-config": "workspace:^0.1.0",
+ "@hoarder/tsconfig": "workspace:^0.1.0",
+ "@types/dompurify": "^3.0.5",
+ "@types/jsdom": "^21.1.6",
+ "@types/metascraper": "^5.14.3"
+ },
+ "scripts": {
+ "start": "tsx watch index.ts",
+ "start:prod": "tsx index.ts",
+ "typecheck": "tsc --noEmit"
+ },
+ "eslintConfig": {
+ "root": true,
+ "extends": [
+ "@hoarder/eslint-config/base"
+ ]
+ },
+ "prettier": "@hoarder/prettier-config"
+}
diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts
new file mode 100644
index 00000000..618e7c89
--- /dev/null
+++ b/apps/workers/searchWorker.ts
@@ -0,0 +1,116 @@
+import { db } from "@hoarder/db";
+import logger from "@hoarder/shared/logger";
+import { getSearchIdxClient } from "@hoarder/shared/search";
+import {
+ SearchIndexingQueue,
+ ZSearchIndexingRequest,
+ queueConnectionDetails,
+ zSearchIndexingRequestSchema,
+} from "@hoarder/shared/queues";
+import { Job } from "bullmq";
+import { Worker } from "bullmq";
+import { bookmarks } from "@hoarder/db/schema";
+import { eq } from "drizzle-orm";
+
+export class SearchIndexingWorker {
+ static async build() {
+ logger.info("Starting search indexing worker ...");
+ const worker = new Worker<ZSearchIndexingRequest, void>(
+ SearchIndexingQueue.name,
+ runSearchIndexing,
+ {
+ connection: queueConnectionDetails,
+ autorun: false,
+ },
+ );
+
+ worker.on("completed", (job) => {
+ const jobId = job?.id || "unknown";
+ logger.info(`[search][${jobId}] Completed successfully`);
+ });
+
+ worker.on("failed", (job, error) => {
+ const jobId = job?.id || "unknown";
+ logger.error(`[search][${jobId}] openai job failed: ${error}`);
+ });
+
+ return worker;
+ }
+}
+
+async function runIndex(
+ searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
+ bookmarkId: string,
+) {
+ const bookmark = await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, bookmarkId),
+ with: {
+ link: true,
+ text: true,
+ tagsOnBookmarks: {
+ with: {
+ tag: true,
+ },
+ },
+ },
+ });
+
+ if (!bookmark) {
+ throw new Error(`Bookmark ${bookmarkId} not found`);
+ }
+
+ searchClient.addDocuments([
+ {
+ id: bookmark.id,
+ userId: bookmark.userId,
+ ...(bookmark.link
+ ? {
+ url: bookmark.link.url,
+ title: bookmark.link.title,
+ description: bookmark.link.description,
+ content: bookmark.link.content,
+ }
+ : undefined),
+ ...(bookmark.text ? { content: bookmark.text.text } : undefined),
+ tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
+ },
+ ]);
+}
+
+async function runDelete(
+ searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
+ bookmarkId: string,
+) {
+ await searchClient.deleteDocument(bookmarkId);
+}
+
+async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
+ const jobId = job.id || "unknown";
+
+ const request = zSearchIndexingRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ throw new Error(
+ `[search][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ }
+
+ const searchClient = await getSearchIdxClient();
+ if (!searchClient) {
+ logger.debug(
+ `[search][${jobId}] Search is not configured, nothing to do now`,
+ );
+ return;
+ }
+
+ const bookmarkId = request.data.bookmarkId;
+ switch (request.data.type) {
+ case "index": {
+ await runIndex(searchClient, bookmarkId);
+ break;
+ }
+ case "delete": {
+ await runDelete(searchClient, bookmarkId);
+ break;
+ }
+ }
+}
diff --git a/apps/workers/tsconfig.json b/apps/workers/tsconfig.json
new file mode 100644
index 00000000..24b9a10d
--- /dev/null
+++ b/apps/workers/tsconfig.json
@@ -0,0 +1,10 @@
+{
+ "$schema": "https://json.schemastore.org/tsconfig",
+ "extends": "@hoarder/tsconfig/node.json",
+ "include": ["**/*.ts"],
+ "exclude": ["node_modules"],
+ "compilerOptions": {
+ "baseUrl": ".",
+ "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json"
+ }
+}