aboutsummaryrefslogtreecommitdiffstats
path: root/packages/workers
diff options
context:
space:
mode:
authorMohamedBassem <me@mbassem.com>2024-03-13 21:43:44 +0000
committerMohamed Bassem <me@mbassem.com>2024-03-14 16:40:45 +0000
commit04572a8e5081b1e4871e273cde9dbaaa44c52fe0 (patch)
tree8e993acb732a50d1306d4d6953df96c165c57f57 /packages/workers
parent2df08ed08c065e8b91bc8df0266bd4bcbb062be4 (diff)
downloadkarakeep-04572a8e5081b1e4871e273cde9dbaaa44c52fe0.tar.zst
structure: Create apps dir and copy tooling dir from t3-turbo repo
Diffstat (limited to 'packages/workers')
-rw-r--r--packages/workers/crawler.ts201
-rw-r--r--packages/workers/index.ts16
-rw-r--r--packages/workers/openai.ts263
-rw-r--r--packages/workers/package.json45
-rw-r--r--packages/workers/search.ts116
-rw-r--r--packages/workers/tsconfig.json11
6 files changed, 0 insertions, 652 deletions
diff --git a/packages/workers/crawler.ts b/packages/workers/crawler.ts
deleted file mode 100644
index 5db2da7b..00000000
--- a/packages/workers/crawler.ts
+++ /dev/null
@@ -1,201 +0,0 @@
-import logger from "@hoarder/shared/logger";
-import {
- LinkCrawlerQueue,
- OpenAIQueue,
- SearchIndexingQueue,
- ZCrawlLinkRequest,
- queueConnectionDetails,
- zCrawlLinkRequestSchema,
-} from "@hoarder/shared/queues";
-import DOMPurify from "dompurify";
-import { JSDOM } from "jsdom";
-
-import { Worker } from "bullmq";
-import { Job } from "bullmq";
-
-import { db } from "@hoarder/db";
-
-import { Browser } from "puppeteer";
-import puppeteer from "puppeteer-extra";
-import StealthPlugin from "puppeteer-extra-plugin-stealth";
-import AdblockerPlugin from "puppeteer-extra-plugin-adblocker";
-
-import metascraper from "metascraper";
-
-import metascraperDescription from "metascraper-description";
-import metascraperImage from "metascraper-image";
-import metascraperLogo from "metascraper-logo-favicon";
-import metascraperTitle from "metascraper-title";
-import metascraperUrl from "metascraper-url";
-import metascraperTwitter from "metascraper-twitter";
-import metascraperReadability from "metascraper-readability";
-import { Mutex } from "async-mutex";
-import assert from "assert";
-import serverConfig from "@hoarder/shared/config";
-import { bookmarkLinks } from "@hoarder/db/schema";
-import { eq } from "drizzle-orm";
-import { Readability } from "@mozilla/readability";
-
-const metascraperParser = metascraper([
- metascraperReadability(),
- metascraperTitle(),
- metascraperDescription(),
- metascraperTwitter(),
- metascraperImage(),
- metascraperLogo(),
- metascraperUrl(),
-]);
-
-let browser: Browser | undefined;
-// Guards the interactions with the browser instance.
-// This is needed given that most of the browser APIs are async.
-const browserMutex = new Mutex();
-
-async function launchBrowser() {
- browser = undefined;
- await browserMutex.runExclusive(async () => {
- browser = await puppeteer.launch({
- headless: serverConfig.crawler.headlessBrowser,
- executablePath: serverConfig.crawler.browserExecutablePath,
- userDataDir: serverConfig.crawler.browserUserDataDir,
- });
- browser.on("disconnected", async () => {
- logger.info(
- "The puppeteer browser got disconnected. Will attempt to launch it again.",
- );
- await launchBrowser();
- });
- });
-}
-
-export class CrawlerWorker {
- static async build() {
- puppeteer.use(StealthPlugin());
- puppeteer.use(
- AdblockerPlugin({
- blockTrackersAndAnnoyances: true,
- }),
- );
- await launchBrowser();
-
- logger.info("Starting crawler worker ...");
- const worker = new Worker<ZCrawlLinkRequest, void>(
- LinkCrawlerQueue.name,
- runCrawler,
- {
- connection: queueConnectionDetails,
- autorun: false,
- },
- );
-
- worker.on("completed", (job) => {
- const jobId = job?.id || "unknown";
- logger.info(`[Crawler][${jobId}] Completed successfully`);
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id || "unknown";
- logger.error(`[Crawler][${jobId}] Crawling job failed: ${error}`);
- });
-
- return worker;
- }
-}
-
-async function getBookmarkUrl(bookmarkId: string) {
- const bookmark = await db.query.bookmarkLinks.findFirst({
- where: eq(bookmarkLinks.id, bookmarkId),
- });
-
- if (!bookmark) {
- throw new Error("The bookmark either doesn't exist or not a link");
- }
- return bookmark.url;
-}
-
-async function crawlPage(url: string) {
- assert(browser);
- const context = await browser.createBrowserContext();
-
- try {
- const page = await context.newPage();
-
- await page.goto(url, {
- timeout: 10000, // 10 seconds
- });
-
- // Wait until there's at most two connections for 2 seconds
- // Attempt to wait only for 5 seconds
- await Promise.race([
- page.waitForNetworkIdle({
- idleTime: 1000, // 1 sec
- concurrency: 2,
- }),
- new Promise((f) => setTimeout(f, 5000)),
- ]);
-
- const htmlContent = await page.content();
- return htmlContent;
- } finally {
- await context.close();
- }
-}
-
-async function runCrawler(job: Job<ZCrawlLinkRequest, void>) {
- const jobId = job.id || "unknown";
-
- const request = zCrawlLinkRequestSchema.safeParse(job.data);
- if (!request.success) {
- logger.error(
- `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- return;
- }
-
- const { bookmarkId } = request.data;
- const url = await getBookmarkUrl(bookmarkId);
-
- logger.info(
- `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`,
- );
- // TODO(IMPORTANT): Run security validations on the input URL (e.g. deny localhost, etc)
-
- const htmlContent = await crawlPage(url);
-
- const meta = await metascraperParser({
- url,
- html: htmlContent,
- });
-
- const window = new JSDOM("").window;
- const purify = DOMPurify(window);
- const purifiedHTML = purify.sanitize(htmlContent);
- const purifiedDOM = new JSDOM(purifiedHTML, { url });
- const readableContent = new Readability(purifiedDOM.window.document).parse();
-
- // TODO(important): Restrict the size of content to store
-
- await db
- .update(bookmarkLinks)
- .set({
- title: meta.title,
- description: meta.description,
- imageUrl: meta.image,
- favicon: meta.logo,
- content: readableContent?.textContent,
- htmlContent: readableContent?.content,
- crawledAt: new Date(),
- })
- .where(eq(bookmarkLinks.id, bookmarkId));
-
- // Enqueue openai job
- OpenAIQueue.add("openai", {
- bookmarkId,
- });
-
- // Update the search index
- SearchIndexingQueue.add("search_indexing", {
- bookmarkId,
- type: "index",
- });
-}
diff --git a/packages/workers/index.ts b/packages/workers/index.ts
deleted file mode 100644
index 295eeaef..00000000
--- a/packages/workers/index.ts
+++ /dev/null
@@ -1,16 +0,0 @@
-import "dotenv/config";
-import { CrawlerWorker } from "./crawler";
-import { OpenAiWorker } from "./openai";
-import { SearchIndexingWorker } from "./search";
-
-async function main() {
- const [crawler, openai, search] = [
- await CrawlerWorker.build(),
- await OpenAiWorker.build(),
- await SearchIndexingWorker.build(),
- ];
-
- await Promise.all([crawler.run(), openai.run(), search.run()]);
-}
-
-main();
diff --git a/packages/workers/openai.ts b/packages/workers/openai.ts
deleted file mode 100644
index 1ec22d32..00000000
--- a/packages/workers/openai.ts
+++ /dev/null
@@ -1,263 +0,0 @@
-import { db } from "@hoarder/db";
-import logger from "@hoarder/shared/logger";
-import serverConfig from "@hoarder/shared/config";
-import {
- OpenAIQueue,
- SearchIndexingQueue,
- ZOpenAIRequest,
- queueConnectionDetails,
- zOpenAIRequestSchema,
-} from "@hoarder/shared/queues";
-import { Job } from "bullmq";
-import OpenAI from "openai";
-import { z } from "zod";
-import { Worker } from "bullmq";
-import { bookmarkTags, bookmarks, tagsOnBookmarks } from "@hoarder/db/schema";
-import { and, eq, inArray } from "drizzle-orm";
-
-const openAIResponseSchema = z.object({
- tags: z.array(z.string()),
-});
-
-async function attemptMarkTaggingStatus(
- jobData: object | undefined,
- status: "success" | "failure",
-) {
- if (!jobData) {
- return;
- }
- try {
- const request = zOpenAIRequestSchema.parse(jobData);
- await db
- .update(bookmarks)
- .set({
- taggingStatus: status,
- })
- .where(eq(bookmarks.id, request.bookmarkId));
- } catch (e) {
- console.log(`Something went wrong when marking the tagging status: ${e}`);
- }
-}
-
-export class OpenAiWorker {
- static async build() {
- logger.info("Starting openai worker ...");
- const worker = new Worker<ZOpenAIRequest, void>(
- OpenAIQueue.name,
- runOpenAI,
- {
- connection: queueConnectionDetails,
- autorun: false,
- },
- );
-
- worker.on("completed", async (job) => {
- const jobId = job?.id || "unknown";
- logger.info(`[openai][${jobId}] Completed successfully`);
- await attemptMarkTaggingStatus(job?.data, "success");
- });
-
- worker.on("failed", async (job, error) => {
- const jobId = job?.id || "unknown";
- logger.error(`[openai][${jobId}] openai job failed: ${error}`);
- await attemptMarkTaggingStatus(job?.data, "failure");
- });
-
- return worker;
- }
-}
-
-const PROMPT_BASE = `
-I'm building a read-it-later app and I need your help with automatic tagging.
-Please analyze the text after the sentence "CONTENT START HERE:" and suggest relevant tags that describe its key themes, topics, and main ideas.
-Aim for a variety of tags, including broad categories, specific keywords, and potential sub-genres. If it's a famous website
-you may also include a tag for the website. Tags should be lowercases and don't contain spaces. If the tag is not generic enough, don't
-include it. Aim for 3-5 tags. If there are no good tags, don't emit any. The content can include text for cookie consent and privacy policy, ignore those while tagging.
-You must respond in JSON with the key "tags" and the value is list of tags.
-CONTENT START HERE:
-`;
-
-function buildPrompt(
- bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
-) {
- if (bookmark.link) {
- if (!bookmark.link.description && !bookmark.link.content) {
- throw new Error(
- `No content found for link "${bookmark.id}". Skipping ...`,
- );
- }
-
- let content = bookmark.link.content;
- if (content) {
- let words = content.split(" ");
- if (words.length > 2000) {
- words = words.slice(2000);
- content = words.join(" ");
- }
- }
- return `
-${PROMPT_BASE}
-URL: ${bookmark.link.url}
-Title: ${bookmark.link.title || ""}
-Description: ${bookmark.link.description || ""}
-Content: ${content || ""}
- `;
- }
-
- if (bookmark.text) {
- // TODO: Ensure that the content doesn't exceed the context length of openai
- return `
-${PROMPT_BASE}
-${bookmark.text.text}
- `;
- }
-
- throw new Error("Unknown bookmark type");
-}
-
-async function fetchBookmark(linkId: string) {
- return await db.query.bookmarks.findFirst({
- where: eq(bookmarks.id, linkId),
- with: {
- link: true,
- text: true,
- },
- });
-}
-
-async function inferTags(
- jobId: string,
- bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
- openai: OpenAI,
-) {
- const chatCompletion = await openai.chat.completions.create({
- messages: [{ role: "system", content: buildPrompt(bookmark) }],
- model: "gpt-3.5-turbo-0125",
- response_format: { type: "json_object" },
- });
-
- const response = chatCompletion.choices[0].message.content;
- if (!response) {
- throw new Error(`[openai][${jobId}] Got no message content from OpenAI`);
- }
-
- try {
- let tags = openAIResponseSchema.parse(JSON.parse(response)).tags;
- logger.info(
- `[openai][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${chatCompletion.usage?.total_tokens} tokens and inferred: ${tags}`,
- );
-
- // Sometimes the tags contain the hashtag symbol, let's strip them out if they do.
- tags = tags.map((t) => {
- if (t.startsWith("#")) {
- return t.slice(1);
- }
- return t;
- });
-
- return tags;
- } catch (e) {
- throw new Error(
- `[openai][${jobId}] Failed to parse JSON response from OpenAI: ${e}`,
- );
- }
-}
-
-async function connectTags(
- bookmarkId: string,
- newTags: string[],
- userId: string,
-) {
- if (newTags.length == 0) {
- return;
- }
-
- await db.transaction(async (tx) => {
- // Create tags that didn't exist previously
- await tx
- .insert(bookmarkTags)
- .values(
- newTags.map((t) => ({
- name: t,
- userId,
- })),
- )
- .onConflictDoNothing();
-
- const newTagIds = (
- await tx.query.bookmarkTags.findMany({
- where: and(
- eq(bookmarkTags.userId, userId),
- inArray(bookmarkTags.name, newTags),
- ),
- columns: {
- id: true,
- },
- })
- ).map((r) => r.id);
-
- // Delete old AI tags
- await tx
- .delete(tagsOnBookmarks)
- .where(
- and(
- eq(tagsOnBookmarks.attachedBy, "ai"),
- eq(tagsOnBookmarks.bookmarkId, bookmarkId),
- ),
- );
-
- // Attach new ones
- await tx
- .insert(tagsOnBookmarks)
- .values(
- newTagIds.map((tagId) => ({
- tagId,
- bookmarkId,
- attachedBy: "ai" as const,
- })),
- )
- .onConflictDoNothing();
- });
-}
-
-async function runOpenAI(job: Job<ZOpenAIRequest, void>) {
- const jobId = job.id || "unknown";
-
- const { openAI } = serverConfig;
-
- if (!openAI.apiKey) {
- logger.debug(
- `[openai][${jobId}] OpenAI is not configured, nothing to do now`,
- );
- return;
- }
-
- const openai = new OpenAI({
- apiKey: openAI.apiKey,
- });
-
- const request = zOpenAIRequestSchema.safeParse(job.data);
- if (!request.success) {
- throw new Error(
- `[openai][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- }
-
- const { bookmarkId } = request.data;
- const bookmark = await fetchBookmark(bookmarkId);
- if (!bookmark) {
- throw new Error(
- `[openai][${jobId}] bookmark with id ${bookmarkId} was not found`,
- );
- }
-
- const tags = await inferTags(jobId, bookmark, openai);
-
- await connectTags(bookmarkId, tags, bookmark.userId);
-
- // Update the search index
- SearchIndexingQueue.add("search_indexing", {
- bookmarkId,
- type: "index",
- });
-}
diff --git a/packages/workers/package.json b/packages/workers/package.json
deleted file mode 100644
index f2fc164c..00000000
--- a/packages/workers/package.json
+++ /dev/null
@@ -1,45 +0,0 @@
-{
- "$schema": "https://json.schemastore.org/package.json",
- "name": "@hoarder/workers",
- "version": "0.1.0",
- "private": true,
- "dependencies": {
- "@hoarder/db": "workspace:*",
- "@hoarder/shared": "workspace:*",
- "@mozilla/readability": "^0.5.0",
- "@tsconfig/node21": "^21.0.1",
- "async-mutex": "^0.4.1",
- "bullmq": "^5.1.9",
- "dompurify": "^3.0.9",
- "dotenv": "^16.4.1",
- "drizzle-orm": "^0.29.4",
- "jsdom": "^24.0.0",
- "metascraper": "^5.43.4",
- "metascraper-description": "^5.43.4",
- "metascraper-image": "^5.43.4",
- "metascraper-logo": "^5.43.4",
- "metascraper-logo-favicon": "^5.43.4",
- "metascraper-readability": "^5.43.4",
- "metascraper-title": "^5.43.4",
- "metascraper-twitter": "^5.43.4",
- "metascraper-url": "^5.43.4",
- "openai": "^4.26.1",
- "puppeteer": "^22.0.0",
- "puppeteer-extra": "^3.3.6",
- "puppeteer-extra-plugin-adblocker": "^2.13.6",
- "puppeteer-extra-plugin-stealth": "^2.11.2",
- "tsx": "^4.7.1",
- "typescript": "^5",
- "zod": "^3.22.4"
- },
- "devDependencies": {
- "@types/dompurify": "^3.0.5",
- "@types/jsdom": "^21.1.6",
- "@types/metascraper": "^5.14.3"
- },
- "scripts": {
- "start": "tsx watch index.ts",
- "start:prod": "tsx index.ts",
- "typecheck": "tsc --noEmit"
- }
-}
diff --git a/packages/workers/search.ts b/packages/workers/search.ts
deleted file mode 100644
index 618e7c89..00000000
--- a/packages/workers/search.ts
+++ /dev/null
@@ -1,116 +0,0 @@
-import { db } from "@hoarder/db";
-import logger from "@hoarder/shared/logger";
-import { getSearchIdxClient } from "@hoarder/shared/search";
-import {
- SearchIndexingQueue,
- ZSearchIndexingRequest,
- queueConnectionDetails,
- zSearchIndexingRequestSchema,
-} from "@hoarder/shared/queues";
-import { Job } from "bullmq";
-import { Worker } from "bullmq";
-import { bookmarks } from "@hoarder/db/schema";
-import { eq } from "drizzle-orm";
-
-export class SearchIndexingWorker {
- static async build() {
- logger.info("Starting search indexing worker ...");
- const worker = new Worker<ZSearchIndexingRequest, void>(
- SearchIndexingQueue.name,
- runSearchIndexing,
- {
- connection: queueConnectionDetails,
- autorun: false,
- },
- );
-
- worker.on("completed", (job) => {
- const jobId = job?.id || "unknown";
- logger.info(`[search][${jobId}] Completed successfully`);
- });
-
- worker.on("failed", (job, error) => {
- const jobId = job?.id || "unknown";
- logger.error(`[search][${jobId}] openai job failed: ${error}`);
- });
-
- return worker;
- }
-}
-
-async function runIndex(
- searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
- bookmarkId: string,
-) {
- const bookmark = await db.query.bookmarks.findFirst({
- where: eq(bookmarks.id, bookmarkId),
- with: {
- link: true,
- text: true,
- tagsOnBookmarks: {
- with: {
- tag: true,
- },
- },
- },
- });
-
- if (!bookmark) {
- throw new Error(`Bookmark ${bookmarkId} not found`);
- }
-
- searchClient.addDocuments([
- {
- id: bookmark.id,
- userId: bookmark.userId,
- ...(bookmark.link
- ? {
- url: bookmark.link.url,
- title: bookmark.link.title,
- description: bookmark.link.description,
- content: bookmark.link.content,
- }
- : undefined),
- ...(bookmark.text ? { content: bookmark.text.text } : undefined),
- tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name),
- },
- ]);
-}
-
-async function runDelete(
- searchClient: NonNullable<Awaited<ReturnType<typeof getSearchIdxClient>>>,
- bookmarkId: string,
-) {
- await searchClient.deleteDocument(bookmarkId);
-}
-
-async function runSearchIndexing(job: Job<ZSearchIndexingRequest, void>) {
- const jobId = job.id || "unknown";
-
- const request = zSearchIndexingRequestSchema.safeParse(job.data);
- if (!request.success) {
- throw new Error(
- `[search][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- }
-
- const searchClient = await getSearchIdxClient();
- if (!searchClient) {
- logger.debug(
- `[search][${jobId}] Search is not configured, nothing to do now`,
- );
- return;
- }
-
- const bookmarkId = request.data.bookmarkId;
- switch (request.data.type) {
- case "index": {
- await runIndex(searchClient, bookmarkId);
- break;
- }
- case "delete": {
- await runDelete(searchClient, bookmarkId);
- break;
- }
- }
-}
diff --git a/packages/workers/tsconfig.json b/packages/workers/tsconfig.json
deleted file mode 100644
index cf49c407..00000000
--- a/packages/workers/tsconfig.json
+++ /dev/null
@@ -1,11 +0,0 @@
-{
- "$schema": "https://json.schemastore.org/tsconfig",
- "extends": "@tsconfig/node21/tsconfig.json",
- "include": ["**/*.ts"],
- "exclude": ["node_modules"],
- "compilerOptions": {
- "module": "ESNext",
- "moduleResolution": "node",
- "esModuleInterop": true
- }
-}