diff options
| author | MohamedBassem <me@mbassem.com> | 2024-03-13 21:43:44 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2024-03-14 16:40:45 +0000 |
| commit | 04572a8e5081b1e4871e273cde9dbaaa44c52fe0 (patch) | |
| tree | 8e993acb732a50d1306d4d6953df96c165c57f57 /packages/workers | |
| parent | 2df08ed08c065e8b91bc8df0266bd4bcbb062be4 (diff) | |
| download | karakeep-04572a8e5081b1e4871e273cde9dbaaa44c52fe0.tar.zst | |
structure: Create apps dir and copy tooling dir from t3-turbo repo
Diffstat (limited to 'packages/workers')
| -rw-r--r-- | packages/workers/crawler.ts | 201 | ||||
| -rw-r--r-- | packages/workers/index.ts | 16 | ||||
| -rw-r--r-- | packages/workers/openai.ts | 263 | ||||
| -rw-r--r-- | packages/workers/package.json | 45 | ||||
| -rw-r--r-- | packages/workers/search.ts | 116 | ||||
| -rw-r--r-- | packages/workers/tsconfig.json | 11 |
6 files changed, 0 insertions, 652 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts deleted file mode 100644 index 5db2da7b..00000000 --- a/packages/workers/crawler.ts +++ /dev/null @@ -1,201 +0,0 @@ -import logger from "@hoarder/shared/logger"; -import { - LinkCrawlerQueue, - OpenAIQueue, - SearchIndexingQueue, - ZCrawlLinkRequest, - queueConnectionDetails, - zCrawlLinkRequestSchema, -} from "@hoarder/shared/queues"; -import DOMPurify from "dompurify"; -import { JSDOM } from "jsdom"; - -import { Worker } from "bullmq"; -import { Job } from "bullmq"; - -import { db } from "@hoarder/db"; - -import { Browser } from "puppeteer"; -import puppeteer from "puppeteer-extra"; -import StealthPlugin from "puppeteer-extra-plugin-stealth"; -import AdblockerPlugin from "puppeteer-extra-plugin-adblocker"; - -import metascraper from "metascraper"; - -import metascraperDescription from "metascraper-description"; -import metascraperImage from "metascraper-image"; -import metascraperLogo from "metascraper-logo-favicon"; -import metascraperTitle from "metascraper-title"; -import metascraperUrl from "metascraper-url"; -import metascraperTwitter from "metascraper-twitter"; -import metascraperReadability from "metascraper-readability"; -import { Mutex } from "async-mutex"; -import assert from "assert"; -import serverConfig from "@hoarder/shared/config"; -import { bookmarkLinks } from "@hoarder/db/schema"; -import { eq } from "drizzle-orm"; -import { Readability } from "@mozilla/readability"; - -const metascraperParser = metascraper([ - metascraperReadability(), - metascraperTitle(), - metascraperDescription(), - metascraperTwitter(), - metascraperImage(), - metascraperLogo(), - metascraperUrl(), -]); - -let browser: Browser | undefined; -// Guards the interactions with the browser instance. -// This is needed given that most of the browser APIs are async. -const browserMutex = new Mutex(); - -async function launchBrowser() { - browser = undefined; - await browserMutex.runExclusive(async () => { - browser = await puppeteer.launch({ - headless: serverConfig.crawler.headlessBrowser, - executablePath: serverConfig.crawler.browserExecutablePath, - userDataDir: serverConfig.crawler.browserUserDataDir, - }); - browser.on("disconnected", async () => { - logger.info( - "The puppeteer browser got disconnected. Will attempt to launch it again.", - ); - await launchBrowser(); - }); - }); -} - -export class CrawlerWorker { - static async build() { - puppeteer.use(StealthPlugin()); - puppeteer.use( - AdblockerPlugin({ - blockTrackersAndAnnoyances: true, - }), - ); - await launchBrowser(); - - logger.info("Starting crawler worker ..."); - const worker = new Worker<ZCrawlLinkRequest, void>( - LinkCrawlerQueue.name, - runCrawler, - { - connection: queueConnectionDetails, - autorun: false, - }, - ); - - worker.on("completed", (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[Crawler][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`); - }); - - return worker; - } -} - -async function getBookmarkUrl(bookmarkId: string) { - const bookmark = await db.query.bookmarkLinks.findFirst({ - where: eq(bookmarkLinks.id, bookmarkId), - }); - - if (!bookmark) { - throw new Error("The bookmark either doesn't exist or not a link"); - } - return bookmark.url; -} - -async function crawlPage(url: string) { - assert(browser); - const context = await browser.createBrowserContext(); - - try { - const page = await context.newPage(); - - await page.goto(url, { - timeout: 10000, // 10 seconds - }); - - // Wait until there's at most two connections for 2 seconds - // Attempt to wait only for 5 seconds - await Promise.race([ - page.waitForNetworkIdle({ - idleTime: 1000, // 1 sec - concurrency: 2, - }), - new Promise((f) => setTimeout(f, 5000)), - ]); - - const htmlContent = await page.content(); - return htmlContent; - } finally { - await context.close(); - } -} - -async function runCrawler(job: Job<ZCrawlLinkRequest, void>) { - const jobId = job.id || "unknown"; - - const request = zCrawlLinkRequestSchema.safeParse(job.data); - if (!request.success) { - logger.error( - `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - return; - } - - const { bookmarkId } = request.data; - const url = await getBookmarkUrl(bookmarkId); - - logger.info( - `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, - ); - // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc) - - const htmlContent = await crawlPage(url); - - const meta = await metascraperParser({ - url, - html: htmlContent, - }); - - const window = new JSDOM("").window; - const purify = DOMPurify(window); - const purifiedHTML = purify.sanitize(htmlContent); - const purifiedDOM = new JSDOM(purifiedHTML, { url }); - const readableContent = new Readability(purifiedDOM.window.document).parse(); - - // TODO(important): Restrict the size of content to store - - await db - .update(bookmarkLinks) - .set({ - title: meta.title, - description: meta.description, - imageUrl: meta.image, - favicon: meta.logo, - content: readableContent?.textContent, - htmlContent: readableContent?.content, - crawledAt: new Date(), - }) - .where(eq(bookmarkLinks.id, bookmarkId)); - - // Enqueue openai job - OpenAIQueue.add("openai", { - bookmarkId, - }); - - // Update the search index - SearchIndexingQueue.add("search_indexing", { - bookmarkId, - type: "index", - }); -} diff --git a/packages/workers/index.ts b/packages/workers/index.ts deleted file mode 100644 index 295eeaef..00000000 --- a/packages/workers/index.ts +++ /dev/null @@ -1,16 +0,0 @@ -import "dotenv/config"; -import { CrawlerWorker } from "./crawler"; -import { OpenAiWorker } from "./openai"; -import { SearchIndexingWorker } from "./search"; - -async function main() { - const [crawler, openai, search] = [ - await CrawlerWorker.build(), - await OpenAiWorker.build(), - await SearchIndexingWorker.build(), - ]; - - await Promise.all([crawler.run(), openai.run(), search.run()]); -} - -main(); diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts deleted file mode 100644 index 1ec22d32..00000000 --- a/packages/workers/openai.ts +++ /dev/null @@ -1,263 +0,0 @@ -import { db } from "@hoarder/db"; -import logger from "@hoarder/shared/logger"; -import serverConfig from "@hoarder/shared/config"; -import { - OpenAIQueue, - SearchIndexingQueue, - ZOpenAIRequest, - queueConnectionDetails, - zOpenAIRequestSchema, -} from "@hoarder/shared/queues"; -import { Job } from "bullmq"; -import OpenAI from "openai"; -import { z } from "zod"; -import { Worker } from "bullmq"; -import { bookmarkTags, bookmarks, tagsOnBookmarks } from "@hoarder/db/schema"; -import { and, eq, inArray } from "drizzle-orm"; - -const openAIResponseSchema = z.object({ - tags: z.array(z.string()), -}); - -async function attemptMarkTaggingStatus( - jobData: object | undefined, - status: "success" | "failure", -) { - if (!jobData) { - return; - } - try { - const request = zOpenAIRequestSchema.parse(jobData); - await db - .update(bookmarks) - .set({ - taggingStatus: status, - }) - .where(eq(bookmarks.id, request.bookmarkId)); - } catch (e) { - console.log(`Something went wrong when marking the tagging status: ${e}`); - } -} - -export class OpenAiWorker { - static async build() { - logger.info("Starting openai worker ..."); - const worker = new Worker<ZOpenAIRequest, void>( - OpenAIQueue.name, - runOpenAI, - { - connection: queueConnectionDetails, - autorun: false, - }, - ); - - worker.on("completed", async (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[openai][${jobId}] Completed successfully`); - await attemptMarkTaggingStatus(job?.data, "success"); - }); - - worker.on("failed", async (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[openai][${jobId}] openai job failed: ${error}`); - await attemptMarkTaggingStatus(job?.data, "failure"); - }); - - return worker; - } -} - -const PROMPT_BASE = ` -I'm building a read-it-later app and I need your help with automatic tagging. -Please analyze the text after the sentence "CONTENT START HERE:" and suggest relevant tags that describe its key themes, topics, and main ideas. -Aim for a variety of tags, including broad categories, specific keywords, and potential sub-genres. If it's a famous website -you may also include a tag for the website. Tags should be lowercases and don't contain spaces. If the tag is not generic enough, don't -include it. Aim for 3-5 tags. If there are no good tags, don't emit any. The content can include text for cookie consent and privacy policy, ignore those while tagging. -You must respond in JSON with the key "tags" and the value is list of tags. -CONTENT START HERE: -`; - -function buildPrompt( - bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, -) { - if (bookmark.link) { - if (!bookmark.link.description && !bookmark.link.content) { - throw new Error( - `No content found for link "${bookmark.id}". Skipping ...`, - ); - } - - let content = bookmark.link.content; - if (content) { - let words = content.split(" "); - if (words.length > 2000) { - words = words.slice(2000); - content = words.join(" "); - } - } - return ` -${PROMPT_BASE} -URL: ${bookmark.link.url} -Title: ${bookmark.link.title || ""} -Description: ${bookmark.link.description || ""} -Content: ${content || ""} - `; - } - - if (bookmark.text) { - // TODO: Ensure that the content doesn't exceed the context length of openai - return ` -${PROMPT_BASE} -${bookmark.text.text} - `; - } - - throw new Error("Unknown bookmark type"); -} - -async function fetchBookmark(linkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, linkId), - with: { - link: true, - text: true, - }, - }); -} - -async function inferTags( - jobId: string, - bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, - openai: OpenAI, -) { - const chatCompletion = await openai.chat.completions.create({ - messages: [{ role: "system", content: buildPrompt(bookmark) }], - model: "gpt-3.5-turbo-0125", - response_format: { type: "json_object" }, - }); - - const response = chatCompletion.choices[0].message.content; - if (!response) { - throw new Error(`[openai][${jobId}] Got no message content from OpenAI`); - } - - try { - let tags = openAIResponseSchema.parse(JSON.parse(response)).tags; - logger.info( - `[openai][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`, - ); - - // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. - tags = tags.map((t) => { - if (t.startsWith("#")) { - return t.slice(1); - } - return t; - }); - - return tags; - } catch (e) { - throw new Error( - `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`, - ); - } -} - -async function connectTags( - bookmarkId: string, - newTags: string[], - userId: string, -) { - if (newTags.length == 0) { - return; - } - - await db.transaction(async (tx) => { - // Create tags that didn't exist previously - await tx - .insert(bookmarkTags) - .values( - newTags.map((t) => ({ - name: t, - userId, - })), - ) - .onConflictDoNothing(); - - const newTagIds = ( - await tx.query.bookmarkTags.findMany({ - where: and( - eq(bookmarkTags.userId, userId), - inArray(bookmarkTags.name, newTags), - ), - columns: { - id: true, - }, - }) - ).map((r) => r.id); - - // Delete old AI tags - await tx - .delete(tagsOnBookmarks) - .where( - and( - eq(tagsOnBookmarks.attachedBy, "ai"), - eq(tagsOnBookmarks.bookmarkId, bookmarkId), - ), - ); - - // Attach new ones - await tx - .insert(tagsOnBookmarks) - .values( - newTagIds.map((tagId) => ({ - tagId, - bookmarkId, - attachedBy: "ai" as const, - })), - ) - .onConflictDoNothing(); - }); -} - -async function runOpenAI(job: Job<ZOpenAIRequest, void>) { - const jobId = job.id || "unknown"; - - const { openAI } = serverConfig; - - if (!openAI.apiKey) { - logger.debug( - `[openai][${jobId}] OpenAI is not configured, nothing to do now`, - ); - return; - } - - const openai = new OpenAI({ - apiKey: openAI.apiKey, - }); - - const request = zOpenAIRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const { bookmarkId } = request.data; - const bookmark = await fetchBookmark(bookmarkId); - if (!bookmark) { - throw new Error( - `[openai][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - - const tags = await inferTags(jobId, bookmark, openai); - - await connectTags(bookmarkId, tags, bookmark.userId); - - // Update the search index - SearchIndexingQueue.add("search_indexing", { - bookmarkId, - type: "index", - }); -} diff --git a/packages/workers/package.json b/packages/workers/package.json deleted file mode 100644 index f2fc164c..00000000 --- a/packages/workers/package.json +++ /dev/null @@ -1,45 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/package.json", - "name": "@hoarder/workers", - "version": "0.1.0", - "private": true, - "dependencies": { - "@hoarder/db": "workspace:*", - "@hoarder/shared": "workspace:*", - "@mozilla/readability": "^0.5.0", - "@tsconfig/node21": "^21.0.1", - "async-mutex": "^0.4.1", - "bullmq": "^5.1.9", - "dompurify": "^3.0.9", - "dotenv": "^16.4.1", - "drizzle-orm": "^0.29.4", - "jsdom": "^24.0.0", - "metascraper": "^5.43.4", - "metascraper-description": "^5.43.4", - "metascraper-image": "^5.43.4", - "metascraper-logo": "^5.43.4", - "metascraper-logo-favicon": "^5.43.4", - "metascraper-readability": "^5.43.4", - "metascraper-title": "^5.43.4", - "metascraper-twitter": "^5.43.4", - "metascraper-url": "^5.43.4", - "openai": "^4.26.1", - "puppeteer": "^22.0.0", - "puppeteer-extra": "^3.3.6", - "puppeteer-extra-plugin-adblocker": "^2.13.6", - "puppeteer-extra-plugin-stealth": "^2.11.2", - "tsx": "^4.7.1", - "typescript": "^5", - "zod": "^3.22.4" - }, - "devDependencies": { - "@types/dompurify": "^3.0.5", - "@types/jsdom": "^21.1.6", - "@types/metascraper": "^5.14.3" - }, - "scripts": { - "start": "tsx watch index.ts", - "start:prod": "tsx index.ts", - "typecheck": "tsc --noEmit" - } -} diff --git a/packages/workers/search.ts b/packages/workers/search.ts deleted file mode 100644 index 618e7c89..00000000 --- a/packages/workers/search.ts +++ /dev/null @@ -1,116 +0,0 @@ -import { db } from "@hoarder/db"; -import logger from "@hoarder/shared/logger"; -import { getSearchIdxClient } from "@hoarder/shared/search"; -import { - SearchIndexingQueue, - ZSearchIndexingRequest, - queueConnectionDetails, - zSearchIndexingRequestSchema, -} from "@hoarder/shared/queues"; -import { Job } from "bullmq"; -import { Worker } from "bullmq"; -import { bookmarks } from "@hoarder/db/schema"; -import { eq } from "drizzle-orm"; - -export class SearchIndexingWorker { - static async build() { - logger.info("Starting search indexing worker ..."); - const worker = new Worker<ZSearchIndexingRequest, void>( - SearchIndexingQueue.name, - runSearchIndexing, - { - connection: queueConnectionDetails, - autorun: false, - }, - ); - - worker.on("completed", (job) => { - const jobId = job?.id || "unknown"; - logger.info(`[search][${jobId}] Completed successfully`); - }); - - worker.on("failed", (job, error) => { - const jobId = job?.id || "unknown"; - logger.error(`[search][${jobId}] openai job failed: ${error}`); - }); - - return worker; - } -} - -async function runIndex( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - bookmarkId: string, -) { - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: true, - text: true, - tagsOnBookmarks: { - with: { - tag: true, - }, - }, - }, - }); - - if (!bookmark) { - throw new Error(`Bookmark ${bookmarkId} not found`); - } - - searchClient.addDocuments([ - { - id: bookmark.id, - userId: bookmark.userId, - ...(bookmark.link - ? { - url: bookmark.link.url, - title: bookmark.link.title, - description: bookmark.link.description, - content: bookmark.link.content, - } - : undefined), - ...(bookmark.text ? { content: bookmark.text.text } : undefined), - tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), - }, - ]); -} - -async function runDelete( - searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>, - bookmarkId: string, -) { - await searchClient.deleteDocument(bookmarkId); -} - -async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) { - const jobId = job.id || "unknown"; - - const request = zSearchIndexingRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const searchClient = await getSearchIdxClient(); - if (!searchClient) { - logger.debug( - `[search][${jobId}] Search is not configured, nothing to do now`, - ); - return; - } - - const bookmarkId = request.data.bookmarkId; - switch (request.data.type) { - case "index": { - await runIndex(searchClient, bookmarkId); - break; - } - case "delete": { - await runDelete(searchClient, bookmarkId); - break; - } - } -} diff --git a/packages/workers/tsconfig.json b/packages/workers/tsconfig.json deleted file mode 100644 index cf49c407..00000000 --- a/packages/workers/tsconfig.json +++ /dev/null @@ -1,11 +0,0 @@ -{ - "$schema": "https://json.schemastore.org/tsconfig", - "extends": "@tsconfig/node21/tsconfig.json", - "include": ["**/*.ts"], - "exclude": ["node_modules"], - "compilerOptions": { - "module": "ESNext", - "moduleResolution": "node", - "esModuleInterop": true - } -} |
