diff options
| author | MohamedBassem <me@mbassem.com> | 2024-03-13 21:43:44 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2024-03-14 16:40:45 +0000 |
| commit | 04572a8e5081b1e4871e273cde9dbaaa44c52fe0 (patch) | |
| tree | 8e993acb732a50d1306d4d6953df96c165c57f57 /apps/workers | |
| parent | 2df08ed08c065e8b91bc8df0266bd4bcbb062be4 (diff) | |
| download | karakeep-04572a8e5081b1e4871e273cde9dbaaa44c52fe0.tar.zst | |
structure: Create apps dir and copy tooling dir from t3-turbo repo
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 201 | ||||
| -rw-r--r-- | apps/workers/index.ts | 16 | ||||
| -rw-r--r-- | apps/workers/openaiWorker.ts | 263 | ||||
| -rw-r--r-- | apps/workers/package.json | 56 | ||||
| -rw-r--r-- | apps/workers/searchWorker.ts | 116 | ||||
| -rw-r--r-- | apps/workers/tsconfig.json | 10 |
6 files changed, 662 insertions, 0 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts new file mode 100644 index 00000000..5db2da7b --- /dev/null +++ b/apps/workers/crawlerWorker.ts @@ -0,0 +1,201 @@ +import logger from "@hoarder/shared/logger"; +import { + LinkCrawlerQueue, + OpenAIQueue, + SearchIndexingQueue, + ZCrawlLinkRequest, + queueConnectionDetails, + zCrawlLinkRequestSchema, +} from "@hoarder/shared/queues"; +import DOMPurify from "dompurify"; +import { JSDOM } from "jsdom"; + +import { Worker } from "bullmq"; +import { Job } from "bullmq"; + +import { db } from "@hoarder/db"; + +import { Browser } from "puppeteer"; +import puppeteer from "puppeteer-extra"; +import StealthPlugin from "puppeteer-extra-plugin-stealth"; +import AdblockerPlugin from "puppeteer-extra-plugin-adblocker"; + +import metascraper from "metascraper"; + +import metascraperDescription from "metascraper-description"; +import metascraperImage from "metascraper-image"; +import metascraperLogo from "metascraper-logo-favicon"; +import metascraperTitle from "metascraper-title"; +import metascraperUrl from "metascraper-url"; +import metascraperTwitter from "metascraper-twitter"; +import metascraperReadability from "metascraper-readability"; +import { Mutex } from "async-mutex"; +import assert from "assert"; +import serverConfig from "@hoarder/shared/config"; +import { bookmarkLinks } from "@hoarder/db/schema"; +import { eq } from "drizzle-orm"; +import { Readability } from "@mozilla/readability"; + +const metascraperParser = metascraper([ + metascraperReadability(), + metascraperTitle(), + metascraperDescription(), + metascraperTwitter(), + metascraperImage(), + metascraperLogo(), + metascraperUrl(), +]); + +let browser: Browser | undefined; +// Guards the interactions with the browser instance. +// This is needed given that most of the browser APIs are async. +const browserMutex = new Mutex(); + +async function launchBrowser() { + browser = undefined; + await browserMutex.runExclusive(async () => { + browser = await puppeteer.launch({ + headless: serverConfig.crawler.headlessBrowser, + executablePath: serverConfig.crawler.browserExecutablePath, + userDataDir: serverConfig.crawler.browserUserDataDir, + }); + browser.on("disconnected", async () => { + logger.info( + "The puppeteer browser got disconnected. Will attempt to launch it again.", + ); + await launchBrowser(); + }); + }); +} + +export class CrawlerWorker { + static async build() { + puppeteer.use(StealthPlugin()); + puppeteer.use( + AdblockerPlugin({ + blockTrackersAndAnnoyances: true, + }), + ); + await launchBrowser(); + + logger.info("Starting crawler worker ..."); + const worker = new Worker<ZCrawlLinkRequest, void>( + LinkCrawlerQueue.name, + runCrawler, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[Crawler][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); + }); + + return worker; + } +} + +async function getBookmarkUrl(bookmarkId: string) { + const bookmark = await db.query.bookmarkLinks.findFirst({ + where: eq(bookmarkLinks.id, bookmarkId), + }); + + if (!bookmark) { + throw new Error("The bookmark either doesn't exist or not a link"); + } + return bookmark.url; +} + +async function crawlPage(url: string) { + assert(browser); + const context = await browser.createBrowserContext(); + + try { + const page = await context.newPage(); + + await page.goto(url, { + timeout: 10000, // 10 seconds + }); + + // Wait until there's at most two connections for 2 seconds + // Attempt to wait only for 5 seconds + await Promise.race([ + page.waitForNetworkIdle({ + idleTime: 1000, // 1 sec + concurrency: 2, + }), + new Promise((f) => setTimeout(f, 5000)), + ]); + + const htmlContent = await page.content(); + return htmlContent; + } finally { + await context.close(); + } +} + +async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { + const jobId = job.id || "unknown"; + + const request = zCrawlLinkRequestSchema.safeParse(job.data); + if (!request.success) { + logger.error( + `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + return; + } + + const { bookmarkId } = request.data; + const url = await getBookmarkUrl(bookmarkId); + + logger.info( + `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, + ); + // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc) + + const htmlContent = await crawlPage(url); + + const meta = await metascraperParser({ + url, + html: htmlContent, + }); + + const window = new JSDOM("").window; + const purify = DOMPurify(window); + const purifiedHTML = purify.sanitize(htmlContent); + const purifiedDOM = new JSDOM(purifiedHTML, { url }); + const readableContent = new Readability(purifiedDOM.window.document).parse(); + + // TODO(important): Restrict the size of content to store + + await db + .update(bookmarkLinks) + .set({ + title: meta.title, + description: meta.description, + imageUrl: meta.image, + favicon: meta.logo, + content: readableContent?.textContent, + htmlContent: readableContent?.content, + crawledAt: new Date(), + }) + .where(eq(bookmarkLinks.id, bookmarkId)); + + // Enqueue openai job + OpenAIQueue.add("openai", { + bookmarkId, + }); + + // Update the search index + SearchIndexingQueue.add("search_indexing", { + bookmarkId, + type: "index", + }); +} diff --git a/apps/workers/index.ts b/apps/workers/index.ts new file mode 100644 index 00000000..5b6b62d5 --- /dev/null +++ b/apps/workers/index.ts @@ -0,0 +1,16 @@ +import "dotenv/config"; +import { CrawlerWorker } from "./crawlerWorker"; +import { OpenAiWorker } from "./openaiWorker"; +import { SearchIndexingWorker } from "./searchWorker"; + +async function main() { + const [crawler, openai, search] = [ + await CrawlerWorker.build(), + await OpenAiWorker.build(), + await SearchIndexingWorker.build(), + ]; + + await Promise.all([crawler.run(), openai.run(), search.run()]); +} + +main(); diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts new file mode 100644 index 00000000..1ec22d32 --- /dev/null +++ b/apps/workers/openaiWorker.ts @@ -0,0 +1,263 @@ +import { db } from "@hoarder/db"; +import logger from "@hoarder/shared/logger"; +import serverConfig from "@hoarder/shared/config"; +import { + OpenAIQueue, + SearchIndexingQueue, + ZOpenAIRequest, + queueConnectionDetails, + zOpenAIRequestSchema, +} from "@hoarder/shared/queues"; +import { Job } from "bullmq"; +import OpenAI from "openai"; +import { z } from "zod"; +import { Worker } from "bullmq"; +import { bookmarkTags, bookmarks, tagsOnBookmarks } from "@hoarder/db/schema"; +import { and, eq, inArray } from "drizzle-orm"; + +const openAIResponseSchema = z.object({ + tags: z.array(z.string()), +}); + +async function attemptMarkTaggingStatus( + jobData: object | undefined, + status: "success" | "failure", +) { + if (!jobData) { + return; + } + try { + const request = zOpenAIRequestSchema.parse(jobData); + await db + .update(bookmarks) + .set({ + taggingStatus: status, + }) + .where(eq(bookmarks.id, request.bookmarkId)); + } catch (e) { + console.log(`Something went wrong when marking the tagging status: ${e}`); + } +} + +export class OpenAiWorker { + static async build() { + logger.info("Starting openai worker ..."); + const worker = new Worker<ZOpenAIRequest, void>( + OpenAIQueue.name, + runOpenAI, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", async (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[openai][${jobId}] Completed successfully`); + await attemptMarkTaggingStatus(job?.data, "success"); + }); + + worker.on("failed", async (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[openai][${jobId}] openai job failed: ${error}`); + await attemptMarkTaggingStatus(job?.data, "failure"); + }); + + return worker; + } +} + +const PROMPT_BASE = ` +I'm building a read-it-later app and I need your help with automatic tagging. +Please analyze the text after the sentence "CONTENT START HERE:" and suggest relevant tags that describe its key themes, topics, and main ideas. +Aim for a variety of tags, including broad categories, specific keywords, and potential sub-genres. If it's a famous website +you may also include a tag for the website. Tags should be lowercases and don't contain spaces. If the tag is not generic enough, don't +include it. Aim for 3-5 tags. If there are no good tags, don't emit any. The content can include text for cookie consent and privacy policy, ignore those while tagging. +You must respond in JSON with the key "tags" and the value is list of tags. +CONTENT START HERE: +`; + +function buildPrompt( + bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, +) { + if (bookmark.link) { + if (!bookmark.link.description && !bookmark.link.content) { + throw new Error( + `No content found for link "${bookmark.id}". Skipping ...`, + ); + } + + let content = bookmark.link.content; + if (content) { + let words = content.split(" "); + if (words.length > 2000) { + words = words.slice(2000); + content = words.join(" "); + } + } + return ` +${PROMPT_BASE} +URL: ${bookmark.link.url} +Title: ${bookmark.link.title || ""} +Description: ${bookmark.link.description || ""} +Content: ${content || ""} + `; + } + + if (bookmark.text) { + // TODO: Ensure that the content doesn't exceed the context length of openai + return ` +${PROMPT_BASE} +${bookmark.text.text} + `; + } + + throw new Error("Unknown bookmark type"); +} + +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + }, + }); +} + +async function inferTags( + jobId: string, + bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, + openai: OpenAI, +) { + const chatCompletion = await openai.chat.completions.create({ + messages: [{ role: "system", content: buildPrompt(bookmark) }], + model: "gpt-3.5-turbo-0125", + response_format: { type: "json_object" }, + }); + + const response = chatCompletion.choices[0].message.content; + if (!response) { + throw new Error(`[openai][${jobId}] Got no message content from OpenAI`); + } + + try { + let tags = openAIResponseSchema.parse(JSON.parse(response)).tags; + logger.info( + `[openai][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`, + ); + + // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. + tags = tags.map((t) => { + if (t.startsWith("#")) { + return t.slice(1); + } + return t; + }); + + return tags; + } catch (e) { + throw new Error( + `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`, + ); + } +} + +async function connectTags( + bookmarkId: string, + newTags: string[], + userId: string, +) { + if (newTags.length == 0) { + return; + } + + await db.transaction(async (tx) => { + // Create tags that didn't exist previously + await tx + .insert(bookmarkTags) + .values( + newTags.map((t) => ({ + name: t, + userId, + })), + ) + .onConflictDoNothing(); + + const newTagIds = ( + await tx.query.bookmarkTags.findMany({ + where: and( + eq(bookmarkTags.userId, userId), + inArray(bookmarkTags.name, newTags), + ), + columns: { + id: true, + }, + }) + ).map((r) => r.id); + + // Delete old AI tags + await tx + .delete(tagsOnBookmarks) + .where( + and( + eq(tagsOnBookmarks.attachedBy, "ai"), + eq(tagsOnBookmarks.bookmarkId, bookmarkId), + ), + ); + + // Attach new ones + await tx + .insert(tagsOnBookmarks) + .values( + newTagIds.map((tagId) => ({ + tagId, + bookmarkId, + attachedBy: "ai" as const, + })), + ) + .onConflictDoNothing(); + }); +} + +async function runOpenAI(job: Job<ZOpenAIRequest, void>) { + const jobId = job.id || "unknown"; + + const { openAI } = serverConfig; + + if (!openAI.apiKey) { + logger.debug( + `[openai][${jobId}] OpenAI is not configured, nothing to do now`, + ); + return; + } + + const openai = new OpenAI({ + apiKey: openAI.apiKey, + }); + + const request = zOpenAIRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const { bookmarkId } = request.data; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[openai][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + const tags = await inferTags(jobId, bookmark, openai); + + await connectTags(bookmarkId, tags, bookmark.userId); + + // Update the search index + SearchIndexingQueue.add("search_indexing", { + bookmarkId, + type: "index", + }); +} diff --git a/apps/workers/package.json b/apps/workers/package.json new file mode 100644 index 00000000..8446a54d --- /dev/null +++ b/apps/workers/package.json @@ -0,0 +1,56 @@ +{ + "$schema": "https://json.schemastore.org/package.json", + "name": "@hoarder/workers", + "version": "0.1.0", + "private": true, + "type": "module", + "dependencies": { + "@hoarder/db": "workspace:*", + "@hoarder/shared": "workspace:*", + "@mozilla/readability": "^0.5.0", + "@tsconfig/node21": "^21.0.1", + "async-mutex": "^0.4.1", + "bullmq": "^5.1.9", + "dompurify": "^3.0.9", + "dotenv": "^16.4.1", + "drizzle-orm": "^0.29.4", + "jsdom": "^24.0.0", + "metascraper": "^5.43.4", + "metascraper-description": "^5.43.4", + "metascraper-image": "^5.43.4", + "metascraper-logo": "^5.43.4", + "metascraper-logo-favicon": "^5.43.4", + "metascraper-readability": "^5.43.4", + "metascraper-title": "^5.43.4", + "metascraper-twitter": "^5.43.4", + "metascraper-url": "^5.43.4", + "openai": "^4.29.0", + "puppeteer": "^22.0.0", + "puppeteer-extra": "^3.3.6", + "puppeteer-extra-plugin-adblocker": "^2.13.6", + "puppeteer-extra-plugin-stealth": "^2.11.2", + "tsx": "^4.7.1", + "typescript": "^5.3.3", + "zod": "^3.22.4" + }, + "devDependencies": { + "@hoarder/eslint-config": "workspace:^0.2.0", + "@hoarder/prettier-config": "workspace:^0.1.0", + "@hoarder/tsconfig": "workspace:^0.1.0", + "@types/dompurify": "^3.0.5", + "@types/jsdom": "^21.1.6", + "@types/metascraper": "^5.14.3" + }, + "scripts": { + "start": "tsx watch index.ts", + "start:prod": "tsx index.ts", + "typecheck": "tsc --noEmit" + }, + "eslintConfig": { + "root": true, + "extends": [ + "@hoarder/eslint-config/base" + ] + }, + "prettier": "@hoarder/prettier-config" +} diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts new file mode 100644 index 00000000..618e7c89 --- /dev/null +++ b/apps/workers/searchWorker.ts @@ -0,0 +1,116 @@ +import { db } from "@hoarder/db"; +import logger from "@hoarder/shared/logger"; +import { getSearchIdxClient } from "@hoarder/shared/search"; +import { + SearchIndexingQueue, + ZSearchIndexingRequest, + queueConnectionDetails, + zSearchIndexingRequestSchema, +} from "@hoarder/shared/queues"; +import { Job } from "bullmq"; +import { Worker } from "bullmq"; +import { bookmarks } from "@hoarder/db/schema"; +import { eq } from "drizzle-orm"; + +export class SearchIndexingWorker { + static async build() { + logger.info("Starting search indexing worker ..."); + const worker = new Worker<ZSearchIndexingRequest, void>( + SearchIndexingQueue.name, + runSearchIndexing, + { + connection: queueConnectionDetails, + autorun: false, + }, + ); + + worker.on("completed", (job) => { + const jobId = job?.id || "unknown"; + logger.info(`[search][${jobId}] Completed successfully`); + }); + + worker.on("failed", (job, error) => { + const jobId = job?.id || "unknown"; + logger.error(`[search][${jobId}] openai job failed: ${error}`); + }); + + return worker; + } +} + +async function runIndex( + searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, + bookmarkId: string, +) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: true, + text: true, + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark ${bookmarkId} not found`); + } + + searchClient.addDocuments([ + { + id: bookmark.id, + userId: bookmark.userId, + ...(bookmark.link + ? { + url: bookmark.link.url, + title: bookmark.link.title, + description: bookmark.link.description, + content: bookmark.link.content, + } + : undefined), + ...(bookmark.text ? { content: bookmark.text.text } : undefined), + tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), + }, + ]); +} + +async function runDelete( + searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, + bookmarkId: string, +) { + await searchClient.deleteDocument(bookmarkId); +} + +async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) { + const jobId = job.id || "unknown"; + + const request = zSearchIndexingRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const searchClient = await getSearchIdxClient(); + if (!searchClient) { + logger.debug( + `[search][${jobId}] Search is not configured, nothing to do now`, + ); + return; + } + + const bookmarkId = request.data.bookmarkId; + switch (request.data.type) { + case "index": { + await runIndex(searchClient, bookmarkId); + break; + } + case "delete": { + await runDelete(searchClient, bookmarkId); + break; + } + } +} diff --git a/apps/workers/tsconfig.json b/apps/workers/tsconfig.json new file mode 100644 index 00000000..24b9a10d --- /dev/null +++ b/apps/workers/tsconfig.json @@ -0,0 +1,10 @@ +{ + "$schema": "https://json.schemastore.org/tsconfig", + "extends": "@hoarder/tsconfig/node.json", + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "compilerOptions": { + "baseUrl": ".", + "tsBuildInfoFile": "node_modules/.cache/tsbuildinfo.json" + } +} |
