From 2743d9e38ecfdbf757d4d2f97bcf09d601245b59 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 18 May 2025 20:22:59 +0000 Subject: feat: Add AI auto summarization. Fixes #1163 --- apps/workers/assetPreprocessingWorker.ts | 338 --------- apps/workers/crawlerWorker.ts | 877 --------------------- apps/workers/feedWorker.ts | 215 ------ apps/workers/index.ts | 27 +- apps/workers/openaiWorker.ts | 463 ------------ apps/workers/ruleEngineWorker.ts | 86 --- apps/workers/searchWorker.ts | 156 ---- apps/workers/tidyAssetsWorker.ts | 107 --- apps/workers/videoWorker.ts | 214 ------ apps/workers/webhookWorker.ts | 146 ---- apps/workers/workers/assetPreprocessingWorker.ts | 339 +++++++++ apps/workers/workers/crawlerWorker.ts | 882 ++++++++++++++++++++++ apps/workers/workers/feedWorker.ts | 215 ++++++ apps/workers/workers/inference/inferenceWorker.ts | 100 +++ apps/workers/workers/inference/summarize.ts | 123 +++ apps/workers/workers/inference/tagging.ts | 399 ++++++++++ apps/workers/workers/ruleEngineWorker.ts | 86 +++ apps/workers/workers/searchWorker.ts | 156 ++++ apps/workers/workers/tidyAssetsWorker.ts | 107 +++ apps/workers/workers/videoWorker.ts | 214 ++++++ apps/workers/workers/webhookWorker.ts | 146 ++++ 21 files changed, 2780 insertions(+), 2616 deletions(-) delete mode 100644 apps/workers/assetPreprocessingWorker.ts delete mode 100644 apps/workers/crawlerWorker.ts delete mode 100644 apps/workers/feedWorker.ts delete mode 100644 apps/workers/openaiWorker.ts delete mode 100644 apps/workers/ruleEngineWorker.ts delete mode 100644 apps/workers/searchWorker.ts delete mode 100644 apps/workers/tidyAssetsWorker.ts delete mode 100644 apps/workers/videoWorker.ts delete mode 100644 apps/workers/webhookWorker.ts create mode 100644 apps/workers/workers/assetPreprocessingWorker.ts create mode 100644 apps/workers/workers/crawlerWorker.ts create mode 100644 apps/workers/workers/feedWorker.ts create mode 100644 apps/workers/workers/inference/inferenceWorker.ts create mode 100644 apps/workers/workers/inference/summarize.ts create mode 100644 apps/workers/workers/inference/tagging.ts create mode 100644 apps/workers/workers/ruleEngineWorker.ts create mode 100644 apps/workers/workers/searchWorker.ts create mode 100644 apps/workers/workers/tidyAssetsWorker.ts create mode 100644 apps/workers/workers/videoWorker.ts create mode 100644 apps/workers/workers/webhookWorker.ts (limited to 'apps/workers') diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/assetPreprocessingWorker.ts deleted file mode 100644 index a678b706..00000000 --- a/apps/workers/assetPreprocessingWorker.ts +++ /dev/null @@ -1,338 +0,0 @@ -import os from "os"; -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import PDFParser from "pdf2json"; -import { fromBuffer } from "pdf2pic"; -import { createWorker } from "tesseract.js"; - -import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - assets, - AssetTypes, - bookmarkAssets, - bookmarks, -} from "@karakeep/db/schema"; -import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - AssetPreprocessingQueue, - OpenAIQueue, - triggerSearchReindex, -} from "@karakeep/shared/queues"; - -export class AssetPreprocessingWorker { - static build() { - logger.info("Starting asset preprocessing worker ..."); - const worker = new Runner( - AssetPreprocessingQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function readImageText(buffer: Buffer) { - if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { - return null; - } - const worker = await createWorker(serverConfig.ocr.langs, undefined, { - cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), - }); - try { - const ret = await worker.recognize(buffer); - if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { - return null; - } - return ret.data.text; - } finally { - await worker.terminate(); - } -} - -async function readPDFText(buffer: Buffer): Promise<{ - text: string; - metadata: Record; -}> { - return new Promise((resolve, reject) => { - const pdfParser = new PDFParser(null, true); - pdfParser.on("pdfParser_dataError", reject); - pdfParser.on("pdfParser_dataReady", (pdfData) => { - resolve({ - text: pdfParser.getRawTextContent(), - metadata: pdfData.Meta, - }); - }); - pdfParser.parseBuffer(buffer); - }); -} - -export async function extractAndSavePDFScreenshot( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasScreenshot = - bookmark.assets.find( - (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, - ) !== undefined; - if (alreadyHasScreenshot && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, - ); - try { - /** - * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL - * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md - */ - const screenshot = await fromBuffer(asset, { - density: 100, - quality: 100, - format: "png", - preserveAspectRatio: true, - })(1, { responseType: "buffer" }); - - if (!screenshot.buffer) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, - ); - return false; - } - - // Store the screenshot - const assetId = newAssetId(); - const fileName = "screenshot.png"; - const contentType = "image/png"; - await saveAsset({ - userId: bookmark.userId, - assetId, - asset: screenshot.buffer, - metadata: { - contentType, - fileName, - }, - }); - - // Insert into database - await db.insert(assets).values({ - id: assetId, - bookmarkId: bookmark.id, - userId: bookmark.userId, - assetType: AssetTypes.ASSET_SCREENSHOT, - contentType, - size: screenshot.buffer.byteLength, - fileName, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, - ); - return true; - } catch (error) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, - ); - return false; - } -} - -async function extractAndSaveImageText( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, - ); - return false; - } - } - let imageText = null; - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, - ); - try { - imageText = await readImageText(asset); - } catch (e) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, - ); - } - if (!imageText) { - return false; - } - - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: imageText, - metadata: null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function extractAndSavePDFText( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, - ); - const pdfParse = await readPDFText(asset); - if (!pdfParse?.text) { - throw new Error( - `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, - ); - } - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: pdfParse.text, - metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function getBookmark(bookmarkId: string) { - return db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); -} - -async function run(req: DequeuedJob) { - const isFixMode = req.data.fixMode; - const jobId = req.id; - const bookmarkId = req.data.bookmarkId; - - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, - ); - - if (!bookmark) { - throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); - } - - if (!bookmark.asset) { - throw new Error( - `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, - ); - } - - const { asset } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - - if (!asset) { - throw new Error( - `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, - ); - } - - let anythingChanged = false; - switch (bookmark.asset.assetType) { - case "image": { - const extractedText = await extractAndSaveImageText( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText; - break; - } - case "pdf": { - const extractedText = await extractAndSavePDFText( - jobId, - asset, - bookmark, - isFixMode, - ); - const extractedScreenshot = await extractAndSavePDFScreenshot( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText || extractedScreenshot; - break; - } - default: - throw new Error( - `[assetPreprocessing][${jobId}] Unsupported bookmark type`, - ); - } - - if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - - // Update the search index - await triggerSearchReindex(bookmarkId); - } -} diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts deleted file mode 100644 index a40cbe53..00000000 --- a/apps/workers/crawlerWorker.ts +++ /dev/null @@ -1,877 +0,0 @@ -import * as dns from "dns"; -import { promises as fs } from "fs"; -import * as path from "node:path"; -import * as os from "os"; -import type { Browser } from "puppeteer"; -import { PuppeteerBlocker } from "@ghostery/adblocker-puppeteer"; -import { Readability } from "@mozilla/readability"; -import { Mutex } from "async-mutex"; -import DOMPurify from "dompurify"; -import { eq } from "drizzle-orm"; -import { execa } from "execa"; -import { isShuttingDown } from "exit"; -import { JSDOM } from "jsdom"; -import { DequeuedJob, Runner } from "liteque"; -import metascraper from "metascraper"; -import metascraperAmazon from "metascraper-amazon"; -import metascraperAuthor from "metascraper-author"; -import metascraperDate from "metascraper-date"; -import metascraperDescription from "metascraper-description"; -import metascraperImage from "metascraper-image"; -import metascraperLogo from "metascraper-logo-favicon"; -import metascraperPublisher from "metascraper-publisher"; -import metascraperReadability from "metascraper-readability"; -import metascraperTitle from "metascraper-title"; -import metascraperTwitter from "metascraper-twitter"; -import metascraperUrl from "metascraper-url"; -import fetch from "node-fetch"; -import puppeteer from "puppeteer-extra"; -import StealthPlugin from "puppeteer-extra-plugin-stealth"; -import { withTimeout } from "utils"; -import { getBookmarkDetails, updateAsset } from "workerUtils"; - -import type { ZCrawlLinkRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - assets, - AssetTypes, - bookmarkAssets, - bookmarkLinks, - bookmarks, -} from "@karakeep/db/schema"; -import { - ASSET_TYPES, - getAssetSize, - IMAGE_ASSET_TYPES, - newAssetId, - readAsset, - saveAsset, - saveAssetFromFile, - silentDeleteAsset, - SUPPORTED_UPLOAD_ASSET_TYPES, -} from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - AssetPreprocessingQueue, - LinkCrawlerQueue, - OpenAIQueue, - triggerSearchReindex, - triggerVideoWorker, - triggerWebhook, - zCrawlLinkRequestSchema, -} from "@karakeep/shared/queues"; -import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; - -const metascraperParser = metascraper([ - metascraperDate({ - dateModified: true, - datePublished: true, - }), - metascraperAmazon(), - metascraperReadability(), - metascraperAuthor(), - metascraperPublisher(), - metascraperTitle(), - metascraperDescription(), - metascraperTwitter(), - metascraperImage(), - metascraperLogo(), - metascraperUrl(), -]); - -let globalBrowser: Browser | undefined; -let globalBlocker: PuppeteerBlocker | undefined; -// Guards the interactions with the browser instance. -// This is needed given that most of the browser APIs are async. -const browserMutex = new Mutex(); - -async function startBrowserInstance() { - const defaultViewport = { - width: 1440, - height: 900, - }; - if (serverConfig.crawler.browserWebSocketUrl) { - logger.info( - `[Crawler] Connecting to existing browser websocket address: ${serverConfig.crawler.browserWebSocketUrl}`, - ); - return puppeteer.connect({ - browserWSEndpoint: serverConfig.crawler.browserWebSocketUrl, - defaultViewport, - }); - } else if (serverConfig.crawler.browserWebUrl) { - logger.info( - `[Crawler] Connecting to existing browser instance: ${serverConfig.crawler.browserWebUrl}`, - ); - const webUrl = new URL(serverConfig.crawler.browserWebUrl); - // We need to resolve the ip address as a workaround for https://github.com/puppeteer/puppeteer/issues/2242 - const { address: address } = await dns.promises.lookup(webUrl.hostname); - webUrl.hostname = address; - logger.info( - `[Crawler] Successfully resolved IP address, new address: ${webUrl.toString()}`, - ); - return puppeteer.connect({ - browserURL: webUrl.toString(), - defaultViewport, - }); - } else { - logger.info(`Running in browserless mode`); - return undefined; - } -} - -async function launchBrowser() { - globalBrowser = undefined; - await browserMutex.runExclusive(async () => { - try { - globalBrowser = await startBrowserInstance(); - } catch (e) { - logger.error( - `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${(e as Error).stack}`, - ); - if (isShuttingDown) { - logger.info("[Crawler] We're shutting down so won't retry."); - return; - } - setTimeout(() => { - launchBrowser(); - }, 5000); - return; - } - globalBrowser?.on("disconnected", () => { - if (isShuttingDown) { - logger.info( - "[Crawler] The puppeteer browser got disconnected. But we're shutting down so won't restart it.", - ); - return; - } - logger.info( - "[Crawler] The puppeteer browser got disconnected. Will attempt to launch it again.", - ); - launchBrowser(); - }); - }); -} - -export class CrawlerWorker { - static async build() { - puppeteer.use(StealthPlugin()); - if (serverConfig.crawler.enableAdblocker) { - try { - logger.info("[crawler] Loading adblocker ..."); - globalBlocker = await PuppeteerBlocker.fromPrebuiltFull(fetch, { - path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), - read: fs.readFile, - write: fs.writeFile, - }); - } catch (e) { - logger.error( - `[crawler] Failed to load adblocker. Will not be blocking ads: ${e}`, - ); - } - } - if (!serverConfig.crawler.browserConnectOnDemand) { - await launchBrowser(); - } else { - logger.info( - "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", - ); - } - - logger.info("Starting crawler worker ..."); - const worker = new Runner( - LinkCrawlerQueue, - { - run: withTimeout( - runCrawler, - /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, - ), - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[Crawler][${jobId}] Completed successfully`); - const bookmarkId = job.data.bookmarkId; - if (bookmarkId) { - await changeBookmarkStatus(bookmarkId, "success"); - } - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`, - ); - const bookmarkId = job.data?.bookmarkId; - if (bookmarkId && job.numRetriesLeft == 0) { - await changeBookmarkStatus(bookmarkId, "failure"); - } - }, - }, - { - pollIntervalMs: 1000, - timeoutSecs: serverConfig.crawler.jobTimeoutSec, - concurrency: serverConfig.crawler.numWorkers, - }, - ); - - return worker; - } -} - -type DBAssetType = typeof assets.$inferInsert; - -async function changeBookmarkStatus( - bookmarkId: string, - crawlStatus: "success" | "failure", -) { - await db - .update(bookmarkLinks) - .set({ - crawlStatus, - }) - .where(eq(bookmarkLinks.id, bookmarkId)); -} - -/** - * This provides some "basic" protection from malicious URLs. However, all of those - * can be easily circumvented by pointing dns of origin to localhost, or with - * redirects. - */ -function validateUrl(url: string) { - const urlParsed = new URL(url); - if (urlParsed.protocol != "http:" && urlParsed.protocol != "https:") { - throw new Error(`Unsupported URL protocol: ${urlParsed.protocol}`); - } - - if (["localhost", "127.0.0.1", "0.0.0.0"].includes(urlParsed.hostname)) { - throw new Error(`Link hostname rejected: ${urlParsed.hostname}`); - } -} - -async function browserlessCrawlPage( - jobId: string, - url: string, - abortSignal: AbortSignal, -) { - logger.info( - `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, - ); - const response = await fetch(url, { - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - logger.info( - `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, - ); - return { - htmlContent: await response.text(), - statusCode: response.status, - screenshot: undefined, - url: response.url, - }; -} - -async function crawlPage( - jobId: string, - url: string, - abortSignal: AbortSignal, -): Promise<{ - htmlContent: string; - screenshot: Buffer | undefined; - statusCode: number; - url: string; -}> { - let browser: Browser | undefined; - if (serverConfig.crawler.browserConnectOnDemand) { - browser = await startBrowserInstance(); - } else { - browser = globalBrowser; - } - if (!browser) { - return browserlessCrawlPage(jobId, url, abortSignal); - } - const context = await browser.createBrowserContext(); - - try { - const page = await context.newPage(); - if (globalBlocker) { - await globalBlocker.enableBlockingInPage(page); - } - await page.setUserAgent( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - ); - - const response = await page.goto(url, { - timeout: serverConfig.crawler.navigateTimeoutSec * 1000, - }); - logger.info( - `[Crawler][${jobId}] Successfully navigated to "${url}". Waiting for the page to load ...`, - ); - - // Wait until there's at most two connections for 2 seconds - // Attempt to wait only for 5 seconds - await Promise.race([ - page.waitForNetworkIdle({ - idleTime: 1000, // 1 sec - concurrency: 2, - }), - new Promise((f) => setTimeout(f, 5000)), - ]); - - logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); - - const htmlContent = await page.content(); - logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); - - let screenshot: Buffer | undefined = undefined; - if (serverConfig.crawler.storeScreenshot) { - try { - screenshot = await Promise.race([ - page.screenshot({ - // If you change this, you need to change the asset type in the store function. - type: "png", - encoding: "binary", - fullPage: serverConfig.crawler.fullPageScreenshot, - }), - new Promise((_, reject) => - setTimeout( - () => - reject( - "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", - ), - serverConfig.crawler.screenshotTimeoutSec * 1000, - ), - ), - ]); - logger.info( - `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, - ); - } catch (e) { - logger.warn( - `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${e}`, - ); - } - } - - return { - htmlContent, - statusCode: response?.status() ?? 0, - screenshot, - url: page.url(), - }; - } finally { - await context.close(); - if (serverConfig.crawler.browserConnectOnDemand) { - await browser.close(); - } - } -} - -async function extractMetadata( - htmlContent: string, - url: string, - jobId: string, -) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, - ); - const meta = await metascraperParser({ - url, - html: htmlContent, - // We don't want to validate the URL again as we've already done it by visiting the page. - // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). - validateUrl: false, - }); - logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`); - return meta; -} - -function extractReadableContent( - htmlContent: string, - url: string, - jobId: string, -) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract readable content ...`, - ); - const dom = new JSDOM(htmlContent, { url }); - const readableContent = new Readability(dom.window.document).parse(); - if (!readableContent || typeof readableContent.content !== "string") { - return null; - } - - const window = new JSDOM("").window; - const purify = DOMPurify(window); - const purifiedHTML = purify.sanitize(readableContent.content); - - logger.info(`[Crawler][${jobId}] Done extracting readable content.`); - return { - content: purifiedHTML, - textContent: readableContent.textContent, - }; -} - -async function storeScreenshot( - screenshot: Buffer | undefined, - userId: string, - jobId: string, -) { - if (!serverConfig.crawler.storeScreenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, - ); - return null; - } - if (!screenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, - ); - return null; - } - const assetId = newAssetId(); - const contentType = "image/png"; - const fileName = "screenshot.png"; - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: screenshot, - }); - logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, - ); - return { assetId, contentType, fileName, size: screenshot.byteLength }; -} - -async function downloadAndStoreFile( - url: string, - userId: string, - jobId: string, - fileType: string, - abortSignal: AbortSignal, -) { - try { - logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`); - const response = await fetch(url, { - signal: abortSignal, - }); - if (!response.ok) { - throw new Error(`Failed to download ${fileType}: ${response.status}`); - } - const buffer = await response.arrayBuffer(); - const assetId = newAssetId(); - - const contentType = response.headers.get("content-type"); - if (!contentType) { - throw new Error("No content type in the response"); - } - - await saveAsset({ - userId, - assetId, - metadata: { contentType }, - asset: Buffer.from(buffer), - }); - - logger.info( - `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId}`, - ); - - return { assetId, userId, contentType, size: buffer.byteLength }; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, - ); - return null; - } -} - -async function downloadAndStoreImage( - url: string, - userId: string, - jobId: string, - abortSignal: AbortSignal, -) { - if (!serverConfig.crawler.downloadBannerImage) { - logger.info( - `[Crawler][${jobId}] Skipping downloading the image as per the config.`, - ); - return null; - } - return downloadAndStoreFile(url, userId, jobId, "image", abortSignal); -} - -async function archiveWebpage( - html: string, - url: string, - userId: string, - jobId: string, - abortSignal: AbortSignal, -) { - logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); - const assetId = newAssetId(); - const assetPath = `/tmp/${assetId}`; - - await execa({ - input: html, - cancelSignal: abortSignal, - })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); - - const contentType = "text/html"; - - await saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata: { - contentType, - }, - }); - - logger.info( - `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, - ); - - return { - assetId, - contentType, - size: await getAssetSize({ userId, assetId }), - }; -} - -async function getContentType( - url: string, - jobId: string, - abortSignal: AbortSignal, -): Promise { - try { - logger.info( - `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, - ); - const response = await fetch(url, { - method: "HEAD", - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - const contentType = response.headers.get("content-type"); - logger.info( - `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, - ); - return contentType; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, - ); - return null; - } -} - -/** - * Downloads the asset from the URL and transforms the linkBookmark to an assetBookmark - * @param url the url the user provided - * @param assetType the type of the asset we're downloading - * @param userId the id of the user - * @param jobId the id of the job for logging - * @param bookmarkId the id of the bookmark - */ -async function handleAsAssetBookmark( - url: string, - assetType: "image" | "pdf", - userId: string, - jobId: string, - bookmarkId: string, - abortSignal: AbortSignal, -) { - const downloaded = await downloadAndStoreFile( - url, - userId, - jobId, - assetType, - abortSignal, - ); - if (!downloaded) { - return; - } - const fileName = path.basename(new URL(url).pathname); - await db.transaction(async (trx) => { - await updateAsset( - undefined, - { - id: downloaded.assetId, - bookmarkId, - userId, - assetType: AssetTypes.BOOKMARK_ASSET, - contentType: downloaded.contentType, - size: downloaded.size, - fileName, - }, - trx, - ); - await trx.insert(bookmarkAssets).values({ - id: bookmarkId, - assetType, - assetId: downloaded.assetId, - content: null, - fileName, - sourceUrl: url, - }); - // Switch the type of the bookmark from LINK to ASSET - await trx - .update(bookmarks) - .set({ type: BookmarkTypes.ASSET }) - .where(eq(bookmarks.id, bookmarkId)); - await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); - }); - await AssetPreprocessingQueue.enqueue({ - bookmarkId, - fixMode: false, - }); -} - -async function crawlAndParseUrl( - url: string, - userId: string, - jobId: string, - bookmarkId: string, - oldScreenshotAssetId: string | undefined, - oldImageAssetId: string | undefined, - oldFullPageArchiveAssetId: string | undefined, - precrawledArchiveAssetId: string | undefined, - archiveFullPage: boolean, - abortSignal: AbortSignal, -) { - let result: { - htmlContent: string; - screenshot: Buffer | undefined; - statusCode: number | null; - url: string; - }; - - if (precrawledArchiveAssetId) { - logger.info( - `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, - ); - const asset = await readAsset({ - userId, - assetId: precrawledArchiveAssetId, - }); - result = { - htmlContent: asset.asset.toString(), - screenshot: undefined, - statusCode: 200, - url, - }; - } else { - result = await crawlPage(jobId, url, abortSignal); - } - abortSignal.throwIfAborted(); - - const { htmlContent, screenshot, statusCode, url: browserUrl } = result; - - const [meta, readableContent, screenshotAssetInfo] = await Promise.all([ - extractMetadata(htmlContent, browserUrl, jobId), - extractReadableContent(htmlContent, browserUrl, jobId), - storeScreenshot(screenshot, userId, jobId), - ]); - abortSignal.throwIfAborted(); - let imageAssetInfo: DBAssetType | null = null; - if (meta.image) { - const downloaded = await downloadAndStoreImage( - meta.image, - userId, - jobId, - abortSignal, - ); - if (downloaded) { - imageAssetInfo = { - id: downloaded.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_BANNER_IMAGE, - contentType: downloaded.contentType, - size: downloaded.size, - }; - } - } - abortSignal.throwIfAborted(); - - const parseDate = (date: string | undefined) => { - if (!date) { - return null; - } - try { - return new Date(date); - } catch (_e) { - return null; - } - }; - - // TODO(important): Restrict the size of content to store - await db.transaction(async (txn) => { - await txn - .update(bookmarkLinks) - .set({ - title: meta.title, - description: meta.description, - // Don't store data URIs as they're not valid URLs and are usually quite large - imageUrl: meta.image?.startsWith("data:") ? null : meta.image, - favicon: meta.logo, - content: readableContent?.textContent, - htmlContent: readableContent?.content, - crawledAt: new Date(), - crawlStatusCode: statusCode, - author: meta.author, - publisher: meta.publisher, - datePublished: parseDate(meta.datePublished), - dateModified: parseDate(meta.dateModified), - }) - .where(eq(bookmarkLinks.id, bookmarkId)); - - if (screenshotAssetInfo) { - await updateAsset( - oldScreenshotAssetId, - { - id: screenshotAssetInfo.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_SCREENSHOT, - contentType: screenshotAssetInfo.contentType, - size: screenshotAssetInfo.size, - fileName: screenshotAssetInfo.fileName, - }, - txn, - ); - } - if (imageAssetInfo) { - await updateAsset(oldImageAssetId, imageAssetInfo, txn); - } - }); - - // Delete the old assets if any - await Promise.all([ - silentDeleteAsset(userId, oldScreenshotAssetId), - silentDeleteAsset(userId, oldImageAssetId), - ]); - - return async () => { - if ( - !precrawledArchiveAssetId && - (serverConfig.crawler.fullPageArchive || archiveFullPage) - ) { - const { - assetId: fullPageArchiveAssetId, - size, - contentType, - } = await archiveWebpage( - htmlContent, - browserUrl, - userId, - jobId, - abortSignal, - ); - - await db.transaction(async (txn) => { - await updateAsset( - oldFullPageArchiveAssetId, - { - id: fullPageArchiveAssetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, - contentType, - size, - fileName: null, - }, - txn, - ); - }); - if (oldFullPageArchiveAssetId) { - silentDeleteAsset(userId, oldFullPageArchiveAssetId); - } - } - }; -} - -async function runCrawler(job: DequeuedJob) { - const jobId = job.id ?? "unknown"; - - const request = zCrawlLinkRequestSchema.safeParse(job.data); - if (!request.success) { - logger.error( - `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - return; - } - - const { bookmarkId, archiveFullPage } = request.data; - const { - url, - userId, - screenshotAssetId: oldScreenshotAssetId, - imageAssetId: oldImageAssetId, - fullPageArchiveAssetId: oldFullPageArchiveAssetId, - precrawledArchiveAssetId, - } = await getBookmarkDetails(bookmarkId); - - logger.info( - `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, - ); - validateUrl(url); - - const contentType = await getContentType(url, jobId, job.abortSignal); - - // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage - const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; - - if (isPdf) { - await handleAsAssetBookmark( - url, - "pdf", - userId, - jobId, - bookmarkId, - job.abortSignal, - ); - } else if ( - contentType && - IMAGE_ASSET_TYPES.has(contentType) && - SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType) - ) { - await handleAsAssetBookmark( - url, - "image", - userId, - jobId, - bookmarkId, - job.abortSignal, - ); - } else { - const archivalLogic = await crawlAndParseUrl( - url, - userId, - jobId, - bookmarkId, - oldScreenshotAssetId, - oldImageAssetId, - oldFullPageArchiveAssetId, - precrawledArchiveAssetId, - archiveFullPage, - job.abortSignal, - ); - - // Enqueue openai job (if not set, assume it's true for backward compatibility) - if (job.data.runInference !== false) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - } - - // Update the search index - await triggerSearchReindex(bookmarkId); - - // Trigger a potential download of a video from the URL - await triggerVideoWorker(bookmarkId, url); - - // Trigger a webhook - await triggerWebhook(bookmarkId, "crawled"); - - // Do the archival as a separate last step as it has the potential for failure - await archivalLogic(); - } -} diff --git a/apps/workers/feedWorker.ts b/apps/workers/feedWorker.ts deleted file mode 100644 index 1eaba0c3..00000000 --- a/apps/workers/feedWorker.ts +++ /dev/null @@ -1,215 +0,0 @@ -import { and, eq, inArray } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import cron from "node-cron"; -import Parser from "rss-parser"; -import { buildImpersonatingTRPCClient } from "trpc"; -import { z } from "zod"; - -import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { FeedQueue } from "@karakeep/shared/queues"; -import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; - -export const FeedRefreshingWorker = cron.schedule( - "0 * * * *", - () => { - logger.info("[feed] Scheduling feed refreshing jobs ..."); - db.query.rssFeedsTable - .findMany({ - columns: { - id: true, - }, - where: eq(rssFeedsTable.enabled, true), - }) - .then((feeds) => { - for (const feed of feeds) { - FeedQueue.enqueue( - { - feedId: feed.id, - }, - { - idempotencyKey: feed.id, - }, - ); - } - }); - }, - { - runOnInit: false, - scheduled: false, - }, -); - -export class FeedWorker { - static build() { - logger.info("Starting feed worker ..."); - const worker = new Runner( - FeedQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[feed][${jobId}] Completed successfully`); - await db - .update(rssFeedsTable) - .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) - .where(eq(rssFeedsTable.id, job.data?.feedId)); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, - ); - if (job.data) { - await db - .update(rssFeedsTable) - .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) - .where(eq(rssFeedsTable.id, job.data?.feedId)); - } - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function run(req: DequeuedJob) { - const jobId = req.id; - const feed = await db.query.rssFeedsTable.findFirst({ - where: eq(rssFeedsTable.id, req.data.feedId), - }); - if (!feed) { - throw new Error( - `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, - ); - } - logger.info( - `[feed][${jobId}] Starting fetching feed "${feed.name}" (${feed.id}) ...`, - ); - - const response = await fetch(feed.url, { - signal: AbortSignal.timeout(5000), - headers: { - UserAgent: - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - Accept: "application/rss+xml", - }, - }); - if (response.status !== 200) { - throw new Error( - `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) returned a non-success status: ${response.status}.`, - ); - } - const contentType = response.headers.get("content-type"); - if (!contentType || !contentType.includes("xml")) { - throw new Error( - `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) is not a valid RSS feed`, - ); - } - const xmlData = await response.text(); - - logger.info( - `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, - ); - - const parser = new Parser({ - customFields: { - item: ["id"], - }, - }); - const unparseFeedData = await parser.parseString(xmlData); - - // Apparently, we can't trust the output of the xml parser. So let's do our own type - // validation. - const feedItemsSchema = z.object({ - id: z.coerce.string(), - link: z.string().optional(), - guid: z.string().optional(), - }); - - const feedItems = unparseFeedData.items - .map((i) => feedItemsSchema.safeParse(i)) - .flatMap((i) => (i.success ? [i.data] : [])); - - logger.info( - `[feed][${jobId}] Found ${feedItems.length} entries in feed "${feed.name}" (${feed.id}) ...`, - ); - - if (feedItems.length === 0) { - logger.info(`[feed][${jobId}] No entries found.`); - return; - } - - // For feeds that don't have guids, use the link as the id - feedItems.forEach((item) => { - item.guid = item.guid ?? `${item.id}` ?? item.link; - }); - - const exitingEntries = await db.query.rssFeedImportsTable.findMany({ - where: and( - eq(rssFeedImportsTable.rssFeedId, feed.id), - inArray( - rssFeedImportsTable.entryId, - feedItems.map((item) => item.guid).filter((id): id is string => !!id), - ), - ), - }); - - const newEntries = feedItems.filter( - (item) => - !exitingEntries.some((entry) => entry.entryId === item.guid) && - item.link && - item.guid, - ); - - if (newEntries.length === 0) { - logger.info( - `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, - ); - return; - } - - logger.info( - `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, - ); - - const trpcClient = await buildImpersonatingTRPCClient(feed.userId); - - const createdBookmarks = await Promise.allSettled( - newEntries.map((item) => - trpcClient.bookmarks.createBookmark({ - type: BookmarkTypes.LINK, - url: item.link!, - }), - ), - ); - - // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. - await db - .insert(rssFeedImportsTable) - .values( - newEntries.map((item, idx) => { - const b = createdBookmarks[idx]; - return { - entryId: item.guid!, - bookmarkId: b.status === "fulfilled" ? b.value.id : null, - rssFeedId: feed.id, - }; - }), - ) - .onConflictDoNothing(); - - logger.info( - `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, - ); - - return Promise.resolve(); -} diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 208666c7..1cc1ce49 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,20 +1,19 @@ import "dotenv/config"; -import { AssetPreprocessingWorker } from "assetPreprocessingWorker"; -import { FeedRefreshingWorker, FeedWorker } from "feedWorker"; -import { RuleEngineWorker } from "ruleEngineWorker"; -import { TidyAssetsWorker } from "tidyAssetsWorker"; - import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { runQueueDBMigrations } from "@karakeep/shared/queues"; -import { CrawlerWorker } from "./crawlerWorker"; import { shutdownPromise } from "./exit"; -import { OpenAiWorker } from "./openaiWorker"; -import { SearchIndexingWorker } from "./searchWorker"; -import { VideoWorker } from "./videoWorker"; -import { WebhookWorker } from "./webhookWorker"; +import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; +import { CrawlerWorker } from "./workers/crawlerWorker"; +import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { OpenAiWorker } from "./workers/inference/inferenceWorker"; +import { RuleEngineWorker } from "./workers/ruleEngineWorker"; +import { SearchIndexingWorker } from "./workers/searchWorker"; +import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; +import { VideoWorker } from "./workers/videoWorker"; +import { WebhookWorker } from "./workers/webhookWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); @@ -22,7 +21,7 @@ async function main() { const [ crawler, - openai, + inference, search, tidyAssets, video, @@ -46,7 +45,7 @@ async function main() { await Promise.any([ Promise.all([ crawler.run(), - openai.run(), + inference.run(), search.run(), tidyAssets.run(), video.run(), @@ -58,12 +57,12 @@ async function main() { shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", + "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", ); FeedRefreshingWorker.stop(); crawler.stop(); - openai.stop(); + inference.stop(); search.stop(); tidyAssets.stop(); video.stop(); diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts deleted file mode 100644 index c8b2770e..00000000 --- a/apps/workers/openaiWorker.ts +++ /dev/null @@ -1,463 +0,0 @@ -import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import { buildImpersonatingTRPCClient } from "trpc"; -import { z } from "zod"; - -import type { InferenceClient } from "@karakeep/shared/inference"; -import type { ZOpenAIRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - bookmarks, - bookmarkTags, - customPrompts, - tagsOnBookmarks, -} from "@karakeep/db/schema"; -import { readAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import { InferenceClientFactory } from "@karakeep/shared/inference"; -import logger from "@karakeep/shared/logger"; -import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; -import { - OpenAIQueue, - triggerRuleEngineOnEvent, - triggerSearchReindex, - triggerWebhook, - zOpenAIRequestSchema, -} from "@karakeep/shared/queues"; - -const openAIResponseSchema = z.object({ - tags: z.array(z.string()), -}); - -function tagNormalizer(col: Column) { - function normalizeTag(tag: string) { - return tag.toLowerCase().replace(/[ \-_]/g, ""); - } - - return { - normalizeTag, - sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`, - }; -} - -async function attemptMarkTaggingStatus( - jobData: object | undefined, - status: "success" | "failure", -) { - if (!jobData) { - return; - } - try { - const request = zOpenAIRequestSchema.parse(jobData); - await db - .update(bookmarks) - .set({ - taggingStatus: status, - }) - .where(eq(bookmarks.id, request.bookmarkId)); - } catch (e) { - logger.error(`Something went wrong when marking the tagging status: ${e}`); - } -} - -export class OpenAiWorker { - static build() { - logger.info("Starting inference worker ..."); - const worker = new Runner( - OpenAIQueue, - { - run: runOpenAI, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[inference][${jobId}] Completed successfully`); - await attemptMarkTaggingStatus(job.data, "success"); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, - ); - if (job.numRetriesLeft == 0) { - await attemptMarkTaggingStatus(job?.data, "failure"); - } - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: serverConfig.inference.jobTimeoutSec, - }, - ); - - return worker; - } -} - -async function buildPrompt( - bookmark: NonNullable>>, -) { - const prompts = await fetchCustomPrompts(bookmark.userId, "text"); - if (bookmark.link) { - if (!bookmark.link.description && !bookmark.link.content) { - throw new Error( - `No content found for link "${bookmark.id}". Skipping ...`, - ); - } - - const content = bookmark.link.content; - return buildTextPrompt( - serverConfig.inference.inferredTagLang, - prompts, - `URL: ${bookmark.link.url} -Title: ${bookmark.link.title ?? ""} -Description: ${bookmark.link.description ?? ""} -Content: ${content ?? ""}`, - serverConfig.inference.contextLength, - ); - } - - if (bookmark.text) { - return buildTextPrompt( - serverConfig.inference.inferredTagLang, - prompts, - bookmark.text.text ?? "", - serverConfig.inference.contextLength, - ); - } - - throw new Error("Unknown bookmark type"); -} - -async function fetchBookmark(linkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, linkId), - with: { - link: true, - text: true, - asset: true, - }, - }); -} - -async function inferTagsFromImage( - jobId: string, - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - const { asset, metadata } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - - if (!asset) { - throw new Error( - `[inference][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmark.id} not found`, - ); - } - - const base64 = asset.toString("base64"); - return inferenceClient.inferFromImage( - buildImagePrompt( - serverConfig.inference.inferredTagLang, - await fetchCustomPrompts(bookmark.userId, "images"), - ), - metadata.contentType, - base64, - { schema: openAIResponseSchema, abortSignal }, - ); -} - -async function fetchCustomPrompts( - userId: string, - appliesTo: "text" | "images", -) { - const prompts = await db.query.customPrompts.findMany({ - where: and( - eq(customPrompts.userId, userId), - inArray(customPrompts.appliesTo, ["all_tagging", appliesTo]), - ), - columns: { - text: true, - }, - }); - - let promptTexts = prompts.map((p) => p.text); - if (containsTagsPlaceholder(prompts)) { - promptTexts = await replaceTagsPlaceholders(promptTexts, userId); - } - - return promptTexts; -} - -async function replaceTagsPlaceholders( - prompts: string[], - userId: string, -): Promise { - const api = await buildImpersonatingTRPCClient(userId); - const tags = (await api.tags.list()).tags; - const tagsString = `[${tags.map((tag) => tag.name).join(", ")}]`; - const aiTagsString = `[${tags - .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 == 0) - .map((tag) => tag.name) - .join(", ")}]`; - const userTagsString = `[${tags - .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 > 0) - .map((tag) => tag.name) - .join(", ")}]`; - - return prompts.map((p) => - p - .replaceAll("$tags", tagsString) - .replaceAll("$aiTags", aiTagsString) - .replaceAll("$userTags", userTagsString), - ); -} - -function containsTagsPlaceholder(prompts: { text: string }[]): boolean { - return ( - prompts.filter( - (p) => - p.text.includes("$tags") || - p.text.includes("$aiTags") || - p.text.includes("$userTags"), - ).length > 0 - ); -} - -async function inferTagsFromPDF( - _jobId: string, - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - const prompt = buildTextPrompt( - serverConfig.inference.inferredTagLang, - await fetchCustomPrompts(bookmark.userId, "text"), - `Content: ${bookmark.asset.content}`, - serverConfig.inference.contextLength, - ); - return inferenceClient.inferFromText(prompt, { - schema: openAIResponseSchema, - abortSignal, - }); -} - -async function inferTagsFromText( - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - return await inferenceClient.inferFromText(await buildPrompt(bookmark), { - schema: openAIResponseSchema, - abortSignal, - }); -} - -async function inferTags( - jobId: string, - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - let response; - if (bookmark.link || bookmark.text) { - response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); - } else if (bookmark.asset) { - switch (bookmark.asset.assetType) { - case "image": - response = await inferTagsFromImage( - jobId, - bookmark, - inferenceClient, - abortSignal, - ); - break; - case "pdf": - response = await inferTagsFromPDF( - jobId, - bookmark, - inferenceClient, - abortSignal, - ); - break; - default: - throw new Error(`[inference][${jobId}] Unsupported bookmark type`); - } - } else { - throw new Error(`[inference][${jobId}] Unsupported bookmark type`); - } - - if (!response) { - throw new Error(`[inference][${jobId}] Inference response is empty`); - } - - try { - let tags = openAIResponseSchema.parse(JSON.parse(response.response)).tags; - logger.info( - `[inference][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${response.totalTokens} tokens and inferred: ${tags}`, - ); - - // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. - // Additionally, trim the tags to prevent whitespaces at the beginning/the end of the tag. - tags = tags.map((t) => { - let tag = t; - if (tag.startsWith("#")) { - tag = t.slice(1); - } - return tag.trim(); - }); - - return tags; - } catch (e) { - const responseSneak = response.response.substring(0, 20); - throw new Error( - `[inference][${jobId}] The model ignored our prompt and didn't respond with the expected JSON: ${JSON.stringify(e)}. Here's a sneak peak from the response: ${responseSneak}`, - ); - } -} - -async function connectTags( - bookmarkId: string, - inferredTags: string[], - userId: string, -) { - if (inferredTags.length == 0) { - return; - } - - await db.transaction(async (tx) => { - // Attempt to match exiting tags with the new ones - const { matchedTagIds, notFoundTagNames } = await (async () => { - const { normalizeTag, sql: normalizedTagSql } = tagNormalizer( - bookmarkTags.name, - ); - const normalizedInferredTags = inferredTags.map((t) => ({ - originalTag: t, - normalizedTag: normalizeTag(t), - })); - - const matchedTags = await tx.query.bookmarkTags.findMany({ - where: and( - eq(bookmarkTags.userId, userId), - inArray( - normalizedTagSql, - normalizedInferredTags.map((t) => t.normalizedTag), - ), - ), - }); - - const matchedTagIds = matchedTags.map((r) => r.id); - const notFoundTagNames = normalizedInferredTags - .filter( - (t) => - !matchedTags.some( - (mt) => normalizeTag(mt.name) === t.normalizedTag, - ), - ) - .map((t) => t.originalTag); - - return { matchedTagIds, notFoundTagNames }; - })(); - - // Create tags that didn't exist previously - let newTagIds: string[] = []; - if (notFoundTagNames.length > 0) { - newTagIds = ( - await tx - .insert(bookmarkTags) - .values( - notFoundTagNames.map((t) => ({ - name: t, - userId, - })), - ) - .onConflictDoNothing() - .returning() - ).map((t) => t.id); - } - - // Delete old AI tags - const detachedTags = await tx - .delete(tagsOnBookmarks) - .where( - and( - eq(tagsOnBookmarks.attachedBy, "ai"), - eq(tagsOnBookmarks.bookmarkId, bookmarkId), - ), - ) - .returning(); - - const allTagIds = new Set([...matchedTagIds, ...newTagIds]); - - // Attach new ones - const attachedTags = await tx - .insert(tagsOnBookmarks) - .values( - [...allTagIds].map((tagId) => ({ - tagId, - bookmarkId, - attachedBy: "ai" as const, - })), - ) - .onConflictDoNothing() - .returning(); - - await triggerRuleEngineOnEvent(bookmarkId, [ - ...detachedTags.map((t) => ({ - type: "tagRemoved" as const, - tagId: t.tagId, - })), - ...attachedTags.map((t) => ({ - type: "tagAdded" as const, - tagId: t.tagId, - })), - ]); - }); -} - -async function runOpenAI(job: DequeuedJob) { - const jobId = job.id; - - const inferenceClient = InferenceClientFactory.build(); - if (!inferenceClient) { - logger.debug( - `[inference][${jobId}] No inference client configured, nothing to do now`, - ); - return; - } - - const request = zOpenAIRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const { bookmarkId } = request.data; - const bookmark = await fetchBookmark(bookmarkId); - if (!bookmark) { - throw new Error( - `[inference][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - - logger.info( - `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, - ); - - const tags = await inferTags( - jobId, - bookmark, - inferenceClient, - job.abortSignal, - ); - - await connectTags(bookmarkId, tags, bookmark.userId); - - // Trigger a webhook - await triggerWebhook(bookmarkId, "ai tagged"); - - // Update the search index - await triggerSearchReindex(bookmarkId); -} diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/ruleEngineWorker.ts deleted file mode 100644 index 427cc383..00000000 --- a/apps/workers/ruleEngineWorker.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import { buildImpersonatingAuthedContext } from "trpc"; - -import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { - RuleEngineQueue, - zRuleEngineRequestSchema, -} from "@karakeep/shared/queues"; -import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; - -export class RuleEngineWorker { - static build() { - logger.info("Starting rule engine worker ..."); - const worker = new Runner( - RuleEngineQueue, - { - run: runRuleEngine, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[ruleEngine][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 10, - validator: zRuleEngineRequestSchema, - }, - ); - - return worker; - } -} - -async function getBookmarkUserId(bookmarkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - columns: { - userId: true, - }, - }); -} - -async function runRuleEngine(job: DequeuedJob) { - const jobId = job.id; - const { bookmarkId, events } = job.data; - - const bookmark = await getBookmarkUserId(bookmarkId); - if (!bookmark) { - throw new Error( - `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - const userId = bookmark.userId; - const authedCtx = await buildImpersonatingAuthedContext(userId); - - const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); - - const results = ( - await Promise.all(events.map((event) => ruleEngine.onEvent(event))) - ).flat(); - - if (results.length == 0) { - return; - } - - const message = results - .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`) - .join("\n"); - - logger.info( - `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`, - ); -} diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts deleted file mode 100644 index e7b827a9..00000000 --- a/apps/workers/searchWorker.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; - -import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { - SearchIndexingQueue, - zSearchIndexingRequestSchema, -} from "@karakeep/shared/queues"; -import { getSearchIdxClient } from "@karakeep/shared/search"; - -export class SearchIndexingWorker { - static build() { - logger.info("Starting search indexing worker ..."); - const worker = new Runner( - SearchIndexingQueue, - { - run: runSearchIndexing, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[search][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function ensureTaskSuccess( - searchClient: NonNullable>>, - taskUid: number, -) { - const task = await searchClient.waitForTask(taskUid); - if (task.error) { - throw new Error(`Search task failed: ${task.error.message}`); - } -} - -async function runIndex( - searchClient: NonNullable>>, - bookmarkId: string, -) { - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: true, - text: true, - asset: true, - tagsOnBookmarks: { - with: { - tag: true, - }, - }, - }, - }); - - if (!bookmark) { - throw new Error(`Bookmark ${bookmarkId} not found`); - } - - const task = await searchClient.addDocuments( - [ - { - id: bookmark.id, - userId: bookmark.userId, - ...(bookmark.link - ? { - url: bookmark.link.url, - linkTitle: bookmark.link.title, - description: bookmark.link.description, - content: bookmark.link.content, - publisher: bookmark.link.publisher, - author: bookmark.link.author, - datePublished: bookmark.link.datePublished, - dateModified: bookmark.link.dateModified, - } - : undefined), - ...(bookmark.asset - ? { - content: bookmark.asset.content, - metadata: bookmark.asset.metadata, - } - : undefined), - ...(bookmark.text ? { content: bookmark.text.text } : undefined), - note: bookmark.note, - summary: bookmark.summary, - title: bookmark.title, - createdAt: bookmark.createdAt.toISOString(), - tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), - }, - ], - { - primaryKey: "id", - }, - ); - await ensureTaskSuccess(searchClient, task.taskUid); -} - -async function runDelete( - searchClient: NonNullable>>, - bookmarkId: string, -) { - const task = await searchClient.deleteDocument(bookmarkId); - await ensureTaskSuccess(searchClient, task.taskUid); -} - -async function runSearchIndexing(job: DequeuedJob) { - const jobId = job.id; - - const request = zSearchIndexingRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const searchClient = await getSearchIdxClient(); - if (!searchClient) { - logger.debug( - `[search][${jobId}] Search is not configured, nothing to do now`, - ); - return; - } - - const bookmarkId = request.data.bookmarkId; - logger.info( - `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`, - ); - - switch (request.data.type) { - case "index": { - await runIndex(searchClient, bookmarkId); - break; - } - case "delete": { - await runDelete(searchClient, bookmarkId); - break; - } - } -} diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/tidyAssetsWorker.ts deleted file mode 100644 index d4c8abdb..00000000 --- a/apps/workers/tidyAssetsWorker.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; - -import { db } from "@karakeep/db"; -import { assets } from "@karakeep/db/schema"; -import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; -import logger from "@karakeep/shared/logger"; -import { - TidyAssetsQueue, - ZTidyAssetsRequest, - zTidyAssetsRequestSchema, -} from "@karakeep/shared/queues"; - -export class TidyAssetsWorker { - static build() { - logger.info("Starting tidy assets worker ..."); - const worker = new Runner( - TidyAssetsQueue, - { - run: runTidyAssets, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[tidyAssets][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function handleAsset( - asset: { - assetId: string; - userId: string; - size: number; - contentType: string; - fileName?: string | null; - }, - request: ZTidyAssetsRequest, - jobId: string, -) { - const dbRow = await db.query.assets.findFirst({ - where: eq(assets.id, asset.assetId), - }); - if (!dbRow) { - if (request.cleanDanglingAssets) { - await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); - logger.info( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, - ); - } else { - logger.warn( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, - ); - } - return; - } - - if (request.syncAssetMetadata) { - await db - .update(assets) - .set({ - contentType: asset.contentType, - fileName: asset.fileName, - size: asset.size, - }) - .where(eq(assets.id, asset.assetId)); - logger.info( - `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, - ); - } -} - -async function runTidyAssets(job: DequeuedJob) { - const jobId = job.id; - - const request = zTidyAssetsRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - for await (const asset of getAllAssets()) { - try { - handleAsset(asset, request.data, jobId); - } catch (e) { - logger.error( - `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, - ); - } - } -} diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts deleted file mode 100644 index b8f85ddf..00000000 --- a/apps/workers/videoWorker.ts +++ /dev/null @@ -1,214 +0,0 @@ -import fs from "fs"; -import * as os from "os"; -import path from "path"; -import { execa } from "execa"; -import { DequeuedJob, Runner } from "liteque"; - -import { db } from "@karakeep/db"; -import { AssetTypes } from "@karakeep/db/schema"; -import { - ASSET_TYPES, - getAssetSize, - newAssetId, - saveAssetFromFile, - silentDeleteAsset, -} from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - VideoWorkerQueue, - ZVideoRequest, - zvideoRequestSchema, -} from "@karakeep/shared/queues"; - -import { withTimeout } from "./utils"; -import { getBookmarkDetails, updateAsset } from "./workerUtils"; - -const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads"); - -export class VideoWorker { - static build() { - logger.info("Starting video worker ..."); - - return new Runner( - VideoWorkerQueue, - { - run: withTimeout( - runWorker, - /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout, - ), - onComplete: async (job) => { - const jobId = job.id; - logger.info( - `[VideoCrawler][${jobId}] Video Download Completed successfully`, - ); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`, - ); - return Promise.resolve(); - }, - }, - { - pollIntervalMs: 1000, - timeoutSecs: serverConfig.crawler.downloadVideoTimeout, - concurrency: 1, - validator: zvideoRequestSchema, - }, - ); - } -} - -function prepareYtDlpArguments(url: string, assetPath: string) { - const ytDlpArguments = [url]; - if (serverConfig.crawler.maxVideoDownloadSize > 0) { - ytDlpArguments.push( - "-f", - `best[filesize<${serverConfig.crawler.maxVideoDownloadSize}M]`, - ); - } - - ytDlpArguments.push(...serverConfig.crawler.ytDlpArguments); - ytDlpArguments.push("-o", assetPath); - ytDlpArguments.push("--no-playlist"); - return ytDlpArguments; -} - -async function runWorker(job: DequeuedJob) { - const jobId = job.id; - const { bookmarkId } = job.data; - - const { - url, - userId, - videoAssetId: oldVideoAssetId, - } = await getBookmarkDetails(bookmarkId); - - if (!serverConfig.crawler.downloadVideo) { - logger.info( - `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it is disabled in the config.`, - ); - return; - } - - const videoAssetId = newAssetId(); - let assetPath = `${TMP_FOLDER}/${videoAssetId}`; - await fs.promises.mkdir(TMP_FOLDER, { recursive: true }); - - const ytDlpArguments = prepareYtDlpArguments(url, assetPath); - - try { - logger.info( - `[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`, - ); - - await execa("yt-dlp", ytDlpArguments, { - cancelSignal: job.abortSignal, - }); - const downloadPath = await findAssetFile(videoAssetId); - if (!downloadPath) { - logger.info( - "[VideoCrawler][${jobId}] yt-dlp didn't download anything. Skipping ...", - ); - return; - } - assetPath = downloadPath; - } catch (e) { - const err = e as Error; - if ( - err.message.includes("ERROR: Unsupported URL:") || - err.message.includes("No media found") - ) { - logger.info( - `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it's not one of the supported yt-dlp URLs`, - ); - return; - } - logger.error( - `[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`, - ); - await deleteLeftOverAssetFile(jobId, videoAssetId); - return; - } - - logger.info( - `[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`, - ); - await saveAssetFromFile({ - userId, - assetId: videoAssetId, - assetPath, - metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, - }); - - await db.transaction(async (txn) => { - await updateAsset( - oldVideoAssetId, - { - id: videoAssetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_VIDEO, - contentType: ASSET_TYPES.VIDEO_MP4, - size: await getAssetSize({ userId, assetId: videoAssetId }), - }, - txn, - ); - }); - await silentDeleteAsset(userId, oldVideoAssetId); - - logger.info( - `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, - ); -} - -/** - * Deletes leftover assets in case the download fails - * - * @param jobId the id of the job - * @param assetId the id of the asset to delete - */ -async function deleteLeftOverAssetFile( - jobId: string, - assetId: string, -): Promise { - let assetFile; - try { - assetFile = await findAssetFile(assetId); - } catch { - // ignore exception, no asset file was found - return; - } - if (!assetFile) { - return; - } - logger.info( - `[VideoCrawler][${jobId}] Deleting leftover video asset "${assetFile}".`, - ); - try { - await fs.promises.rm(assetFile); - } catch (e) { - logger.error( - `[VideoCrawler][${jobId}] Failed deleting leftover video asset "${assetFile}".`, - ); - } -} - -/** - * yt-dlp automatically adds a file ending to the passed in filename --> we have to search it again in the folder - * - * @param assetId the id of the asset to search - * @returns the path to the downloaded asset - */ -async function findAssetFile(assetId: string): Promise { - const files = await fs.promises.readdir(TMP_FOLDER); - for (const file of files) { - if (file.startsWith(assetId)) { - return path.join(TMP_FOLDER, file); - } - } - return null; -} diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts deleted file mode 100644 index 9d3ed2c1..00000000 --- a/apps/workers/webhookWorker.ts +++ /dev/null @@ -1,146 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import fetch from "node-fetch"; - -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - WebhookQueue, - ZWebhookRequest, - zWebhookRequestSchema, -} from "@karakeep/shared/queues"; - -export class WebhookWorker { - static build() { - logger.info("Starting webhook worker ..."); - const worker = new Runner( - WebhookQueue, - { - run: runWebhook, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[webhook][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: - serverConfig.webhook.timeoutSec * - (serverConfig.webhook.retryTimes + 1) + - 1, //consider retry times, and timeout and add 1 second for other stuff - validator: zWebhookRequestSchema, - }, - ); - - return worker; - } -} - -async function fetchBookmark(bookmarkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: { - columns: { - url: true, - }, - }, - user: { - columns: {}, - with: { - webhooks: true, - }, - }, - }, - }); -} - -async function runWebhook(job: DequeuedJob) { - const jobId = job.id; - const webhookTimeoutSec = serverConfig.webhook.timeoutSec; - - const { bookmarkId } = job.data; - const bookmark = await fetchBookmark(bookmarkId); - if (!bookmark) { - throw new Error( - `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - - if (!bookmark.user.webhooks) { - return; - } - - logger.info( - `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id} for operation "${job.data.operation}"`, - ); - - await Promise.allSettled( - bookmark.user.webhooks - .filter((w) => w.events.includes(job.data.operation)) - .map(async (webhook) => { - const url = webhook.url; - const webhookToken = webhook.token; - const maxRetries = serverConfig.webhook.retryTimes; - let attempt = 0; - let success = false; - - while (attempt < maxRetries && !success) { - try { - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - ...(webhookToken - ? { - Authorization: `Bearer ${webhookToken}`, - } - : {}), - }, - body: JSON.stringify({ - jobId, - bookmarkId, - userId: bookmark.userId, - url: bookmark.link ? bookmark.link.url : undefined, - type: bookmark.type, - operation: job.data.operation, - }), - signal: AbortSignal.timeout(webhookTimeoutSec * 1000), - }); - - if (!response.ok) { - logger.error( - `Webhook call to ${url} failed with status: ${response.status}`, - ); - } else { - logger.info( - `[webhook][${jobId}] Webhook to ${url} call succeeded`, - ); - success = true; - } - } catch (error) { - logger.error( - `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, - ); - } - attempt++; - if (!success && attempt < maxRetries) { - logger.info( - `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, - ); - } - } - }), - ); -} diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts new file mode 100644 index 00000000..0c0b7aec --- /dev/null +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -0,0 +1,339 @@ +import os from "os"; +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import PDFParser from "pdf2json"; +import { fromBuffer } from "pdf2pic"; +import { createWorker } from "tesseract.js"; + +import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { + assets, + AssetTypes, + bookmarkAssets, + bookmarks, +} from "@karakeep/db/schema"; +import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + AssetPreprocessingQueue, + OpenAIQueue, + triggerSearchReindex, +} from "@karakeep/shared/queues"; + +export class AssetPreprocessingWorker { + static build() { + logger.info("Starting asset preprocessing worker ..."); + const worker = new Runner( + AssetPreprocessingQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function readImageText(buffer: Buffer) { + if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { + return null; + } + const worker = await createWorker(serverConfig.ocr.langs, undefined, { + cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), + }); + try { + const ret = await worker.recognize(buffer); + if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { + return null; + } + return ret.data.text; + } finally { + await worker.terminate(); + } +} + +async function readPDFText(buffer: Buffer): Promise<{ + text: string; + metadata: Record; +}> { + return new Promise((resolve, reject) => { + const pdfParser = new PDFParser(null, true); + pdfParser.on("pdfParser_dataError", reject); + pdfParser.on("pdfParser_dataReady", (pdfData) => { + resolve({ + text: pdfParser.getRawTextContent(), + metadata: pdfData.Meta, + }); + }); + pdfParser.parseBuffer(buffer); + }); +} + +export async function extractAndSavePDFScreenshot( + jobId: string, + asset: Buffer, + bookmark: NonNullable>>, + isFixMode: boolean, +): Promise { + { + const alreadyHasScreenshot = + bookmark.assets.find( + (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, + ) !== undefined; + if (alreadyHasScreenshot && isFixMode) { + logger.info( + `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, + ); + return false; + } + } + logger.info( + `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, + ); + try { + /** + * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL + * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md + */ + const screenshot = await fromBuffer(asset, { + density: 100, + quality: 100, + format: "png", + preserveAspectRatio: true, + })(1, { responseType: "buffer" }); + + if (!screenshot.buffer) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, + ); + return false; + } + + // Store the screenshot + const assetId = newAssetId(); + const fileName = "screenshot.png"; + const contentType = "image/png"; + await saveAsset({ + userId: bookmark.userId, + assetId, + asset: screenshot.buffer, + metadata: { + contentType, + fileName, + }, + }); + + // Insert into database + await db.insert(assets).values({ + id: assetId, + bookmarkId: bookmark.id, + userId: bookmark.userId, + assetType: AssetTypes.ASSET_SCREENSHOT, + contentType, + size: screenshot.buffer.byteLength, + fileName, + }); + + logger.info( + `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, + ); + return true; + } catch (error) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, + ); + return false; + } +} + +async function extractAndSaveImageText( + jobId: string, + asset: Buffer, + bookmark: NonNullable>>, + isFixMode: boolean, +): Promise { + { + const alreadyHasText = !!bookmark.asset.content; + if (alreadyHasText && isFixMode) { + logger.info( + `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, + ); + return false; + } + } + let imageText = null; + logger.info( + `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, + ); + try { + imageText = await readImageText(asset); + } catch (e) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, + ); + } + if (!imageText) { + return false; + } + + logger.info( + `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, + ); + await db + .update(bookmarkAssets) + .set({ + content: imageText, + metadata: null, + }) + .where(eq(bookmarkAssets.id, bookmark.id)); + return true; +} + +async function extractAndSavePDFText( + jobId: string, + asset: Buffer, + bookmark: NonNullable>>, + isFixMode: boolean, +): Promise { + { + const alreadyHasText = !!bookmark.asset.content; + if (alreadyHasText && isFixMode) { + logger.info( + `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, + ); + return false; + } + } + logger.info( + `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, + ); + const pdfParse = await readPDFText(asset); + if (!pdfParse?.text) { + throw new Error( + `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, + ); + } + logger.info( + `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, + ); + await db + .update(bookmarkAssets) + .set({ + content: pdfParse.text, + metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, + }) + .where(eq(bookmarkAssets.id, bookmark.id)); + return true; +} + +async function getBookmark(bookmarkId: string) { + return db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + asset: true, + assets: true, + }, + }); +} + +async function run(req: DequeuedJob) { + const isFixMode = req.data.fixMode; + const jobId = req.id; + const bookmarkId = req.data.bookmarkId; + + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + asset: true, + assets: true, + }, + }); + + logger.info( + `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, + ); + + if (!bookmark) { + throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); + } + + if (!bookmark.asset) { + throw new Error( + `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, + ); + } + + const { asset } = await readAsset({ + userId: bookmark.userId, + assetId: bookmark.asset.assetId, + }); + + if (!asset) { + throw new Error( + `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, + ); + } + + let anythingChanged = false; + switch (bookmark.asset.assetType) { + case "image": { + const extractedText = await extractAndSaveImageText( + jobId, + asset, + bookmark, + isFixMode, + ); + anythingChanged ||= extractedText; + break; + } + case "pdf": { + const extractedText = await extractAndSavePDFText( + jobId, + asset, + bookmark, + isFixMode, + ); + const extractedScreenshot = await extractAndSavePDFScreenshot( + jobId, + asset, + bookmark, + isFixMode, + ); + anythingChanged ||= extractedText || extractedScreenshot; + break; + } + default: + throw new Error( + `[assetPreprocessing][${jobId}] Unsupported bookmark type`, + ); + } + + if (!isFixMode || anythingChanged) { + await OpenAIQueue.enqueue({ + bookmarkId, + type: "tag", + }); + + // Update the search index + await triggerSearchReindex(bookmarkId); + } +} diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts new file mode 100644 index 00000000..b928e145 --- /dev/null +++ b/apps/workers/workers/crawlerWorker.ts @@ -0,0 +1,882 @@ +import * as dns from "dns"; +import { promises as fs } from "fs"; +import * as path from "node:path"; +import * as os from "os"; +import type { Browser } from "puppeteer"; +import { PuppeteerBlocker } from "@ghostery/adblocker-puppeteer"; +import { Readability } from "@mozilla/readability"; +import { Mutex } from "async-mutex"; +import DOMPurify from "dompurify"; +import { eq } from "drizzle-orm"; +import { execa } from "execa"; +import { isShuttingDown } from "exit"; +import { JSDOM } from "jsdom"; +import { DequeuedJob, Runner } from "liteque"; +import metascraper from "metascraper"; +import metascraperAmazon from "metascraper-amazon"; +import metascraperAuthor from "metascraper-author"; +import metascraperDate from "metascraper-date"; +import metascraperDescription from "metascraper-description"; +import metascraperImage from "metascraper-image"; +import metascraperLogo from "metascraper-logo-favicon"; +import metascraperPublisher from "metascraper-publisher"; +import metascraperReadability from "metascraper-readability"; +import metascraperTitle from "metascraper-title"; +import metascraperTwitter from "metascraper-twitter"; +import metascraperUrl from "metascraper-url"; +import fetch from "node-fetch"; +import puppeteer from "puppeteer-extra"; +import StealthPlugin from "puppeteer-extra-plugin-stealth"; +import { withTimeout } from "utils"; +import { getBookmarkDetails, updateAsset } from "workerUtils"; + +import type { ZCrawlLinkRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { + assets, + AssetTypes, + bookmarkAssets, + bookmarkLinks, + bookmarks, +} from "@karakeep/db/schema"; +import { + ASSET_TYPES, + getAssetSize, + IMAGE_ASSET_TYPES, + newAssetId, + readAsset, + saveAsset, + saveAssetFromFile, + silentDeleteAsset, + SUPPORTED_UPLOAD_ASSET_TYPES, +} from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + AssetPreprocessingQueue, + LinkCrawlerQueue, + OpenAIQueue, + triggerSearchReindex, + triggerVideoWorker, + triggerWebhook, + zCrawlLinkRequestSchema, +} from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +const metascraperParser = metascraper([ + metascraperDate({ + dateModified: true, + datePublished: true, + }), + metascraperAmazon(), + metascraperReadability(), + metascraperAuthor(), + metascraperPublisher(), + metascraperTitle(), + metascraperDescription(), + metascraperTwitter(), + metascraperImage(), + metascraperLogo(), + metascraperUrl(), +]); + +let globalBrowser: Browser | undefined; +let globalBlocker: PuppeteerBlocker | undefined; +// Guards the interactions with the browser instance. +// This is needed given that most of the browser APIs are async. +const browserMutex = new Mutex(); + +async function startBrowserInstance() { + const defaultViewport = { + width: 1440, + height: 900, + }; + if (serverConfig.crawler.browserWebSocketUrl) { + logger.info( + `[Crawler] Connecting to existing browser websocket address: ${serverConfig.crawler.browserWebSocketUrl}`, + ); + return puppeteer.connect({ + browserWSEndpoint: serverConfig.crawler.browserWebSocketUrl, + defaultViewport, + }); + } else if (serverConfig.crawler.browserWebUrl) { + logger.info( + `[Crawler] Connecting to existing browser instance: ${serverConfig.crawler.browserWebUrl}`, + ); + const webUrl = new URL(serverConfig.crawler.browserWebUrl); + // We need to resolve the ip address as a workaround for https://github.com/puppeteer/puppeteer/issues/2242 + const { address: address } = await dns.promises.lookup(webUrl.hostname); + webUrl.hostname = address; + logger.info( + `[Crawler] Successfully resolved IP address, new address: ${webUrl.toString()}`, + ); + return puppeteer.connect({ + browserURL: webUrl.toString(), + defaultViewport, + }); + } else { + logger.info(`Running in browserless mode`); + return undefined; + } +} + +async function launchBrowser() { + globalBrowser = undefined; + await browserMutex.runExclusive(async () => { + try { + globalBrowser = await startBrowserInstance(); + } catch (e) { + logger.error( + `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${(e as Error).stack}`, + ); + if (isShuttingDown) { + logger.info("[Crawler] We're shutting down so won't retry."); + return; + } + setTimeout(() => { + launchBrowser(); + }, 5000); + return; + } + globalBrowser?.on("disconnected", () => { + if (isShuttingDown) { + logger.info( + "[Crawler] The puppeteer browser got disconnected. But we're shutting down so won't restart it.", + ); + return; + } + logger.info( + "[Crawler] The puppeteer browser got disconnected. Will attempt to launch it again.", + ); + launchBrowser(); + }); + }); +} + +export class CrawlerWorker { + static async build() { + puppeteer.use(StealthPlugin()); + if (serverConfig.crawler.enableAdblocker) { + try { + logger.info("[crawler] Loading adblocker ..."); + globalBlocker = await PuppeteerBlocker.fromPrebuiltFull(fetch, { + path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), + read: fs.readFile, + write: fs.writeFile, + }); + } catch (e) { + logger.error( + `[crawler] Failed to load adblocker. Will not be blocking ads: ${e}`, + ); + } + } + if (!serverConfig.crawler.browserConnectOnDemand) { + await launchBrowser(); + } else { + logger.info( + "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", + ); + } + + logger.info("Starting crawler worker ..."); + const worker = new Runner( + LinkCrawlerQueue, + { + run: withTimeout( + runCrawler, + /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, + ), + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[Crawler][${jobId}] Completed successfully`); + const bookmarkId = job.data.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "success"); + } + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`, + ); + const bookmarkId = job.data?.bookmarkId; + if (bookmarkId && job.numRetriesLeft == 0) { + await changeBookmarkStatus(bookmarkId, "failure"); + } + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.jobTimeoutSec, + concurrency: serverConfig.crawler.numWorkers, + }, + ); + + return worker; + } +} + +type DBAssetType = typeof assets.$inferInsert; + +async function changeBookmarkStatus( + bookmarkId: string, + crawlStatus: "success" | "failure", +) { + await db + .update(bookmarkLinks) + .set({ + crawlStatus, + }) + .where(eq(bookmarkLinks.id, bookmarkId)); +} + +/** + * This provides some "basic" protection from malicious URLs. However, all of those + * can be easily circumvented by pointing dns of origin to localhost, or with + * redirects. + */ +function validateUrl(url: string) { + const urlParsed = new URL(url); + if (urlParsed.protocol != "http:" && urlParsed.protocol != "https:") { + throw new Error(`Unsupported URL protocol: ${urlParsed.protocol}`); + } + + if (["localhost", "127.0.0.1", "0.0.0.0"].includes(urlParsed.hostname)) { + throw new Error(`Link hostname rejected: ${urlParsed.hostname}`); + } +} + +async function browserlessCrawlPage( + jobId: string, + url: string, + abortSignal: AbortSignal, +) { + logger.info( + `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, + ); + const response = await fetch(url, { + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + logger.info( + `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, + ); + return { + htmlContent: await response.text(), + statusCode: response.status, + screenshot: undefined, + url: response.url, + }; +} + +async function crawlPage( + jobId: string, + url: string, + abortSignal: AbortSignal, +): Promise<{ + htmlContent: string; + screenshot: Buffer | undefined; + statusCode: number; + url: string; +}> { + let browser: Browser | undefined; + if (serverConfig.crawler.browserConnectOnDemand) { + browser = await startBrowserInstance(); + } else { + browser = globalBrowser; + } + if (!browser) { + return browserlessCrawlPage(jobId, url, abortSignal); + } + const context = await browser.createBrowserContext(); + + try { + const page = await context.newPage(); + if (globalBlocker) { + await globalBlocker.enableBlockingInPage(page); + } + await page.setUserAgent( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + ); + + const response = await page.goto(url, { + timeout: serverConfig.crawler.navigateTimeoutSec * 1000, + }); + logger.info( + `[Crawler][${jobId}] Successfully navigated to "${url}". Waiting for the page to load ...`, + ); + + // Wait until there's at most two connections for 2 seconds + // Attempt to wait only for 5 seconds + await Promise.race([ + page.waitForNetworkIdle({ + idleTime: 1000, // 1 sec + concurrency: 2, + }), + new Promise((f) => setTimeout(f, 5000)), + ]); + + logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); + + const htmlContent = await page.content(); + logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); + + let screenshot: Buffer | undefined = undefined; + if (serverConfig.crawler.storeScreenshot) { + try { + screenshot = await Promise.race([ + page.screenshot({ + // If you change this, you need to change the asset type in the store function. + type: "png", + encoding: "binary", + fullPage: serverConfig.crawler.fullPageScreenshot, + }), + new Promise((_, reject) => + setTimeout( + () => + reject( + "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + ), + serverConfig.crawler.screenshotTimeoutSec * 1000, + ), + ), + ]); + logger.info( + `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, + ); + } catch (e) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${e}`, + ); + } + } + + return { + htmlContent, + statusCode: response?.status() ?? 0, + screenshot, + url: page.url(), + }; + } finally { + await context.close(); + if (serverConfig.crawler.browserConnectOnDemand) { + await browser.close(); + } + } +} + +async function extractMetadata( + htmlContent: string, + url: string, + jobId: string, +) { + logger.info( + `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, + ); + const meta = await metascraperParser({ + url, + html: htmlContent, + // We don't want to validate the URL again as we've already done it by visiting the page. + // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). + validateUrl: false, + }); + logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`); + return meta; +} + +function extractReadableContent( + htmlContent: string, + url: string, + jobId: string, +) { + logger.info( + `[Crawler][${jobId}] Will attempt to extract readable content ...`, + ); + const dom = new JSDOM(htmlContent, { url }); + const readableContent = new Readability(dom.window.document).parse(); + if (!readableContent || typeof readableContent.content !== "string") { + return null; + } + + const window = new JSDOM("").window; + const purify = DOMPurify(window); + const purifiedHTML = purify.sanitize(readableContent.content); + + logger.info(`[Crawler][${jobId}] Done extracting readable content.`); + return { + content: purifiedHTML, + textContent: readableContent.textContent, + }; +} + +async function storeScreenshot( + screenshot: Buffer | undefined, + userId: string, + jobId: string, +) { + if (!serverConfig.crawler.storeScreenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, + ); + return null; + } + if (!screenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, + ); + return null; + } + const assetId = newAssetId(); + const contentType = "image/png"; + const fileName = "screenshot.png"; + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: screenshot, + }); + logger.info( + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, + ); + return { assetId, contentType, fileName, size: screenshot.byteLength }; +} + +async function downloadAndStoreFile( + url: string, + userId: string, + jobId: string, + fileType: string, + abortSignal: AbortSignal, +) { + try { + logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`); + const response = await fetch(url, { + signal: abortSignal, + }); + if (!response.ok) { + throw new Error(`Failed to download ${fileType}: ${response.status}`); + } + const buffer = await response.arrayBuffer(); + const assetId = newAssetId(); + + const contentType = response.headers.get("content-type"); + if (!contentType) { + throw new Error("No content type in the response"); + } + + await saveAsset({ + userId, + assetId, + metadata: { contentType }, + asset: Buffer.from(buffer), + }); + + logger.info( + `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId}`, + ); + + return { assetId, userId, contentType, size: buffer.byteLength }; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, + ); + return null; + } +} + +async function downloadAndStoreImage( + url: string, + userId: string, + jobId: string, + abortSignal: AbortSignal, +) { + if (!serverConfig.crawler.downloadBannerImage) { + logger.info( + `[Crawler][${jobId}] Skipping downloading the image as per the config.`, + ); + return null; + } + return downloadAndStoreFile(url, userId, jobId, "image", abortSignal); +} + +async function archiveWebpage( + html: string, + url: string, + userId: string, + jobId: string, + abortSignal: AbortSignal, +) { + logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); + const assetId = newAssetId(); + const assetPath = `/tmp/${assetId}`; + + await execa({ + input: html, + cancelSignal: abortSignal, + })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); + + const contentType = "text/html"; + + await saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata: { + contentType, + }, + }); + + logger.info( + `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + ); + + return { + assetId, + contentType, + size: await getAssetSize({ userId, assetId }), + }; +} + +async function getContentType( + url: string, + jobId: string, + abortSignal: AbortSignal, +): Promise { + try { + logger.info( + `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, + ); + const response = await fetch(url, { + method: "HEAD", + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + const contentType = response.headers.get("content-type"); + logger.info( + `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, + ); + return contentType; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, + ); + return null; + } +} + +/** + * Downloads the asset from the URL and transforms the linkBookmark to an assetBookmark + * @param url the url the user provided + * @param assetType the type of the asset we're downloading + * @param userId the id of the user + * @param jobId the id of the job for logging + * @param bookmarkId the id of the bookmark + */ +async function handleAsAssetBookmark( + url: string, + assetType: "image" | "pdf", + userId: string, + jobId: string, + bookmarkId: string, + abortSignal: AbortSignal, +) { + const downloaded = await downloadAndStoreFile( + url, + userId, + jobId, + assetType, + abortSignal, + ); + if (!downloaded) { + return; + } + const fileName = path.basename(new URL(url).pathname); + await db.transaction(async (trx) => { + await updateAsset( + undefined, + { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.BOOKMARK_ASSET, + contentType: downloaded.contentType, + size: downloaded.size, + fileName, + }, + trx, + ); + await trx.insert(bookmarkAssets).values({ + id: bookmarkId, + assetType, + assetId: downloaded.assetId, + content: null, + fileName, + sourceUrl: url, + }); + // Switch the type of the bookmark from LINK to ASSET + await trx + .update(bookmarks) + .set({ type: BookmarkTypes.ASSET }) + .where(eq(bookmarks.id, bookmarkId)); + await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); + }); + await AssetPreprocessingQueue.enqueue({ + bookmarkId, + fixMode: false, + }); +} + +async function crawlAndParseUrl( + url: string, + userId: string, + jobId: string, + bookmarkId: string, + oldScreenshotAssetId: string | undefined, + oldImageAssetId: string | undefined, + oldFullPageArchiveAssetId: string | undefined, + precrawledArchiveAssetId: string | undefined, + archiveFullPage: boolean, + abortSignal: AbortSignal, +) { + let result: { + htmlContent: string; + screenshot: Buffer | undefined; + statusCode: number | null; + url: string; + }; + + if (precrawledArchiveAssetId) { + logger.info( + `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, + ); + const asset = await readAsset({ + userId, + assetId: precrawledArchiveAssetId, + }); + result = { + htmlContent: asset.asset.toString(), + screenshot: undefined, + statusCode: 200, + url, + }; + } else { + result = await crawlPage(jobId, url, abortSignal); + } + abortSignal.throwIfAborted(); + + const { htmlContent, screenshot, statusCode, url: browserUrl } = result; + + const [meta, readableContent, screenshotAssetInfo] = await Promise.all([ + extractMetadata(htmlContent, browserUrl, jobId), + extractReadableContent(htmlContent, browserUrl, jobId), + storeScreenshot(screenshot, userId, jobId), + ]); + abortSignal.throwIfAborted(); + let imageAssetInfo: DBAssetType | null = null; + if (meta.image) { + const downloaded = await downloadAndStoreImage( + meta.image, + userId, + jobId, + abortSignal, + ); + if (downloaded) { + imageAssetInfo = { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_BANNER_IMAGE, + contentType: downloaded.contentType, + size: downloaded.size, + }; + } + } + abortSignal.throwIfAborted(); + + const parseDate = (date: string | undefined) => { + if (!date) { + return null; + } + try { + return new Date(date); + } catch (_e) { + return null; + } + }; + + // TODO(important): Restrict the size of content to store + await db.transaction(async (txn) => { + await txn + .update(bookmarkLinks) + .set({ + title: meta.title, + description: meta.description, + // Don't store data URIs as they're not valid URLs and are usually quite large + imageUrl: meta.image?.startsWith("data:") ? null : meta.image, + favicon: meta.logo, + content: readableContent?.textContent, + htmlContent: readableContent?.content, + crawledAt: new Date(), + crawlStatusCode: statusCode, + author: meta.author, + publisher: meta.publisher, + datePublished: parseDate(meta.datePublished), + dateModified: parseDate(meta.dateModified), + }) + .where(eq(bookmarkLinks.id, bookmarkId)); + + if (screenshotAssetInfo) { + await updateAsset( + oldScreenshotAssetId, + { + id: screenshotAssetInfo.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_SCREENSHOT, + contentType: screenshotAssetInfo.contentType, + size: screenshotAssetInfo.size, + fileName: screenshotAssetInfo.fileName, + }, + txn, + ); + } + if (imageAssetInfo) { + await updateAsset(oldImageAssetId, imageAssetInfo, txn); + } + }); + + // Delete the old assets if any + await Promise.all([ + silentDeleteAsset(userId, oldScreenshotAssetId), + silentDeleteAsset(userId, oldImageAssetId), + ]); + + return async () => { + if ( + !precrawledArchiveAssetId && + (serverConfig.crawler.fullPageArchive || archiveFullPage) + ) { + const { + assetId: fullPageArchiveAssetId, + size, + contentType, + } = await archiveWebpage( + htmlContent, + browserUrl, + userId, + jobId, + abortSignal, + ); + + await db.transaction(async (txn) => { + await updateAsset( + oldFullPageArchiveAssetId, + { + id: fullPageArchiveAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, + contentType, + size, + fileName: null, + }, + txn, + ); + }); + if (oldFullPageArchiveAssetId) { + silentDeleteAsset(userId, oldFullPageArchiveAssetId); + } + } + }; +} + +async function runCrawler(job: DequeuedJob) { + const jobId = job.id ?? "unknown"; + + const request = zCrawlLinkRequestSchema.safeParse(job.data); + if (!request.success) { + logger.error( + `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + return; + } + + const { bookmarkId, archiveFullPage } = request.data; + const { + url, + userId, + screenshotAssetId: oldScreenshotAssetId, + imageAssetId: oldImageAssetId, + fullPageArchiveAssetId: oldFullPageArchiveAssetId, + precrawledArchiveAssetId, + } = await getBookmarkDetails(bookmarkId); + + logger.info( + `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, + ); + validateUrl(url); + + const contentType = await getContentType(url, jobId, job.abortSignal); + + // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage + const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; + + if (isPdf) { + await handleAsAssetBookmark( + url, + "pdf", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); + } else if ( + contentType && + IMAGE_ASSET_TYPES.has(contentType) && + SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType) + ) { + await handleAsAssetBookmark( + url, + "image", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); + } else { + const archivalLogic = await crawlAndParseUrl( + url, + userId, + jobId, + bookmarkId, + oldScreenshotAssetId, + oldImageAssetId, + oldFullPageArchiveAssetId, + precrawledArchiveAssetId, + archiveFullPage, + job.abortSignal, + ); + + // Enqueue openai job (if not set, assume it's true for backward compatibility) + if (job.data.runInference !== false) { + await OpenAIQueue.enqueue({ + bookmarkId, + type: "tag", + }); + await OpenAIQueue.enqueue({ + bookmarkId, + type: "summarize", + }); + } + + // Update the search index + await triggerSearchReindex(bookmarkId); + + // Trigger a potential download of a video from the URL + await triggerVideoWorker(bookmarkId, url); + + // Trigger a webhook + await triggerWebhook(bookmarkId, "crawled"); + + // Do the archival as a separate last step as it has the potential for failure + await archivalLogic(); + } +} diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts new file mode 100644 index 00000000..1eaba0c3 --- /dev/null +++ b/apps/workers/workers/feedWorker.ts @@ -0,0 +1,215 @@ +import { and, eq, inArray } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import cron from "node-cron"; +import Parser from "rss-parser"; +import { buildImpersonatingTRPCClient } from "trpc"; +import { z } from "zod"; + +import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { FeedQueue } from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +export const FeedRefreshingWorker = cron.schedule( + "0 * * * *", + () => { + logger.info("[feed] Scheduling feed refreshing jobs ..."); + db.query.rssFeedsTable + .findMany({ + columns: { + id: true, + }, + where: eq(rssFeedsTable.enabled, true), + }) + .then((feeds) => { + for (const feed of feeds) { + FeedQueue.enqueue( + { + feedId: feed.id, + }, + { + idempotencyKey: feed.id, + }, + ); + } + }); + }, + { + runOnInit: false, + scheduled: false, + }, +); + +export class FeedWorker { + static build() { + logger.info("Starting feed worker ..."); + const worker = new Runner( + FeedQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[feed][${jobId}] Completed successfully`); + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.data) { + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function run(req: DequeuedJob) { + const jobId = req.id; + const feed = await db.query.rssFeedsTable.findFirst({ + where: eq(rssFeedsTable.id, req.data.feedId), + }); + if (!feed) { + throw new Error( + `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, + ); + } + logger.info( + `[feed][${jobId}] Starting fetching feed "${feed.name}" (${feed.id}) ...`, + ); + + const response = await fetch(feed.url, { + signal: AbortSignal.timeout(5000), + headers: { + UserAgent: + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + Accept: "application/rss+xml", + }, + }); + if (response.status !== 200) { + throw new Error( + `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) returned a non-success status: ${response.status}.`, + ); + } + const contentType = response.headers.get("content-type"); + if (!contentType || !contentType.includes("xml")) { + throw new Error( + `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) is not a valid RSS feed`, + ); + } + const xmlData = await response.text(); + + logger.info( + `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, + ); + + const parser = new Parser({ + customFields: { + item: ["id"], + }, + }); + const unparseFeedData = await parser.parseString(xmlData); + + // Apparently, we can't trust the output of the xml parser. So let's do our own type + // validation. + const feedItemsSchema = z.object({ + id: z.coerce.string(), + link: z.string().optional(), + guid: z.string().optional(), + }); + + const feedItems = unparseFeedData.items + .map((i) => feedItemsSchema.safeParse(i)) + .flatMap((i) => (i.success ? [i.data] : [])); + + logger.info( + `[feed][${jobId}] Found ${feedItems.length} entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + if (feedItems.length === 0) { + logger.info(`[feed][${jobId}] No entries found.`); + return; + } + + // For feeds that don't have guids, use the link as the id + feedItems.forEach((item) => { + item.guid = item.guid ?? `${item.id}` ?? item.link; + }); + + const exitingEntries = await db.query.rssFeedImportsTable.findMany({ + where: and( + eq(rssFeedImportsTable.rssFeedId, feed.id), + inArray( + rssFeedImportsTable.entryId, + feedItems.map((item) => item.guid).filter((id): id is string => !!id), + ), + ), + }); + + const newEntries = feedItems.filter( + (item) => + !exitingEntries.some((entry) => entry.entryId === item.guid) && + item.link && + item.guid, + ); + + if (newEntries.length === 0) { + logger.info( + `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, + ); + return; + } + + logger.info( + `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + const trpcClient = await buildImpersonatingTRPCClient(feed.userId); + + const createdBookmarks = await Promise.allSettled( + newEntries.map((item) => + trpcClient.bookmarks.createBookmark({ + type: BookmarkTypes.LINK, + url: item.link!, + }), + ), + ); + + // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. + await db + .insert(rssFeedImportsTable) + .values( + newEntries.map((item, idx) => { + const b = createdBookmarks[idx]; + return { + entryId: item.guid!, + bookmarkId: b.status === "fulfilled" ? b.value.id : null, + rssFeedId: feed.id, + }; + }), + ) + .onConflictDoNothing(); + + logger.info( + `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, + ); + + return Promise.resolve(); +} diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts new file mode 100644 index 00000000..f7492c8b --- /dev/null +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -0,0 +1,100 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClientFactory } from "@karakeep/shared/inference"; +import logger from "@karakeep/shared/logger"; +import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues"; + +import { runSummarization } from "./summarize"; +import { runTagging } from "./tagging"; + +async function attemptMarkStatus( + jobData: object | undefined, + status: "success" | "failure", +) { + if (!jobData) { + return; + } + try { + const request = zOpenAIRequestSchema.parse(jobData); + await db + .update(bookmarks) + .set({ + ...(request.type === "summarize" + ? { summarizationStatus: status } + : {}), + ...(request.type === "tag" ? { taggingStatus: status } : {}), + }) + .where(eq(bookmarks.id, request.bookmarkId)); + } catch (e) { + logger.error(`Something went wrong when marking the tagging status: ${e}`); + } +} + +export class OpenAiWorker { + static build() { + logger.info("Starting inference worker ..."); + const worker = new Runner( + OpenAIQueue, + { + run: runOpenAI, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[inference][${jobId}] Completed successfully`); + await attemptMarkStatus(job.data, "success"); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.numRetriesLeft == 0) { + await attemptMarkStatus(job?.data, "failure"); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: serverConfig.inference.jobTimeoutSec, + }, + ); + + return worker; + } +} + +async function runOpenAI(job: DequeuedJob) { + const jobId = job.id; + + const inferenceClient = InferenceClientFactory.build(); + if (!inferenceClient) { + logger.debug( + `[inference][${jobId}] No inference client configured, nothing to do now`, + ); + return; + } + + const request = zOpenAIRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const { bookmarkId } = request.data; + switch (request.data.type) { + case "summarize": + await runSummarization(bookmarkId, job, inferenceClient); + break; + case "tag": + await runTagging(bookmarkId, job, inferenceClient); + break; + default: + throw new Error(`Unknown inference type: ${request.data.type}`); + } +} diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts new file mode 100644 index 00000000..a832fe0a --- /dev/null +++ b/apps/workers/workers/inference/summarize.ts @@ -0,0 +1,123 @@ +import { and, eq } from "drizzle-orm"; +import { DequeuedJob } from "liteque"; + +import { db } from "@karakeep/db"; +import { bookmarks, customPrompts } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClient } from "@karakeep/shared/inference"; +import logger from "@karakeep/shared/logger"; +import { buildSummaryPrompt } from "@karakeep/shared/prompts"; +import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +async function fetchBookmarkDetailsForSummary(bookmarkId: string) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { id: true, userId: true, type: true }, + with: { + link: { + columns: { + title: true, + description: true, + content: true, + publisher: true, + author: true, + url: true, + }, + }, + // If assets (like PDFs with extracted text) should be summarized, extend here + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark with id ${bookmarkId} not found`); + } + return bookmark; +} + +export async function runSummarization( + bookmarkId: string, + job: DequeuedJob, + inferenceClient: InferenceClient, +) { + if (!serverConfig.inference.enableAutoSummarization) { + logger.info( + `[inference][${job.id}] Skipping summarization job for bookmark with id "${bookmarkId}" because it's disabled in the config.`, + ); + return; + } + const jobId = job.id; + + logger.info( + `[inference][${jobId}] Starting a summary job for bookmark with id "${bookmarkId}"`, + ); + + const bookmarkData = await fetchBookmarkDetailsForSummary(bookmarkId); + + let textToSummarize = ""; + if (bookmarkData.type === BookmarkTypes.LINK && bookmarkData.link) { + const link = bookmarkData.link; + textToSummarize = ` +Title: ${link.title ?? ""} +Description: ${link.description ?? ""} +Content: ${link.content ?? ""} +Publisher: ${link.publisher ?? ""} +Author: ${link.author ?? ""} +URL: ${link.url ?? ""} +`; + } else { + logger.warn( + `[inference][${jobId}] Bookmark ${bookmarkId} (type: ${bookmarkData.type}) is not a LINK or TEXT type with content, or content is missing. Skipping summary.`, + ); + return; + } + + if (!textToSummarize.trim()) { + logger.info( + `[inference][${jobId}] No content to summarize for bookmark ${bookmarkId}.`, + ); + return; + } + + const prompts = await db.query.customPrompts.findMany({ + where: and( + eq(customPrompts.userId, bookmarkData.userId), + eq(customPrompts.appliesTo, "summary"), + ), + columns: { + text: true, + }, + }); + + const summaryPrompt = buildSummaryPrompt( + serverConfig.inference.inferredTagLang, + prompts.map((p) => p.text), + textToSummarize, + serverConfig.inference.contextLength, + ); + + const summaryResult = await inferenceClient.inferFromText(summaryPrompt, { + schema: null, // Summaries are typically free-form text + abortSignal: job.abortSignal, + }); + + if (!summaryResult.response) { + throw new Error( + `[inference][${jobId}] Failed to summarize bookmark ${bookmarkId}, empty response from inference client.`, + ); + } + + logger.info( + `[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`, + ); + + await db + .update(bookmarks) + .set({ + summary: summaryResult.response, + modifiedAt: new Date(), + }) + .where(eq(bookmarks.id, bookmarkId)); + + await triggerSearchReindex(bookmarkId); +} diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts new file mode 100644 index 00000000..35c366c7 --- /dev/null +++ b/apps/workers/workers/inference/tagging.ts @@ -0,0 +1,399 @@ +import { and, Column, eq, inArray, sql } from "drizzle-orm"; +import { DequeuedJob } from "liteque"; +import { buildImpersonatingTRPCClient } from "trpc"; +import { z } from "zod"; + +import type { InferenceClient } from "@karakeep/shared/inference"; +import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { + bookmarks, + bookmarkTags, + customPrompts, + tagsOnBookmarks, +} from "@karakeep/db/schema"; +import { readAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; +import { + triggerRuleEngineOnEvent, + triggerSearchReindex, + triggerWebhook, +} from "@karakeep/shared/queues"; + +const openAIResponseSchema = z.object({ + tags: z.array(z.string()), +}); + +function tagNormalizer(col: Column) { + function normalizeTag(tag: string) { + return tag.toLowerCase().replace(/[ \-_]/g, ""); + } + + return { + normalizeTag, + sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`, + }; +} +async function buildPrompt( + bookmark: NonNullable>>, +) { + const prompts = await fetchCustomPrompts(bookmark.userId, "text"); + if (bookmark.link) { + if (!bookmark.link.description && !bookmark.link.content) { + throw new Error( + `No content found for link "${bookmark.id}". Skipping ...`, + ); + } + + const content = bookmark.link.content; + return buildTextPrompt( + serverConfig.inference.inferredTagLang, + prompts, + `URL: ${bookmark.link.url} +Title: ${bookmark.link.title ?? ""} +Description: ${bookmark.link.description ?? ""} +Content: ${content ?? ""}`, + serverConfig.inference.contextLength, + ); + } + + if (bookmark.text) { + return buildTextPrompt( + serverConfig.inference.inferredTagLang, + prompts, + bookmark.text.text ?? "", + serverConfig.inference.contextLength, + ); + } + + throw new Error("Unknown bookmark type"); +} + +async function inferTagsFromImage( + jobId: string, + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + const { asset, metadata } = await readAsset({ + userId: bookmark.userId, + assetId: bookmark.asset.assetId, + }); + + if (!asset) { + throw new Error( + `[inference][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmark.id} not found`, + ); + } + + const base64 = asset.toString("base64"); + return inferenceClient.inferFromImage( + buildImagePrompt( + serverConfig.inference.inferredTagLang, + await fetchCustomPrompts(bookmark.userId, "images"), + ), + metadata.contentType, + base64, + { schema: openAIResponseSchema, abortSignal }, + ); +} + +async function fetchCustomPrompts( + userId: string, + appliesTo: "text" | "images", +) { + const prompts = await db.query.customPrompts.findMany({ + where: and( + eq(customPrompts.userId, userId), + inArray(customPrompts.appliesTo, ["all_tagging", appliesTo]), + ), + columns: { + text: true, + }, + }); + + let promptTexts = prompts.map((p) => p.text); + if (containsTagsPlaceholder(prompts)) { + promptTexts = await replaceTagsPlaceholders(promptTexts, userId); + } + + return promptTexts; +} + +async function replaceTagsPlaceholders( + prompts: string[], + userId: string, +): Promise { + const api = await buildImpersonatingTRPCClient(userId); + const tags = (await api.tags.list()).tags; + const tagsString = `[${tags.map((tag) => tag.name).join(", ")}]`; + const aiTagsString = `[${tags + .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 == 0) + .map((tag) => tag.name) + .join(", ")}]`; + const userTagsString = `[${tags + .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 > 0) + .map((tag) => tag.name) + .join(", ")}]`; + + return prompts.map((p) => + p + .replaceAll("$tags", tagsString) + .replaceAll("$aiTags", aiTagsString) + .replaceAll("$userTags", userTagsString), + ); +} + +function containsTagsPlaceholder(prompts: { text: string }[]): boolean { + return ( + prompts.filter( + (p) => + p.text.includes("$tags") || + p.text.includes("$aiTags") || + p.text.includes("$userTags"), + ).length > 0 + ); +} + +async function inferTagsFromPDF( + _jobId: string, + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + const prompt = buildTextPrompt( + serverConfig.inference.inferredTagLang, + await fetchCustomPrompts(bookmark.userId, "text"), + `Content: ${bookmark.asset.content}`, + serverConfig.inference.contextLength, + ); + return inferenceClient.inferFromText(prompt, { + schema: openAIResponseSchema, + abortSignal, + }); +} + +async function inferTagsFromText( + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + return await inferenceClient.inferFromText(await buildPrompt(bookmark), { + schema: openAIResponseSchema, + abortSignal, + }); +} + +async function inferTags( + jobId: string, + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + let response; + if (bookmark.link || bookmark.text) { + response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); + } else if (bookmark.asset) { + switch (bookmark.asset.assetType) { + case "image": + response = await inferTagsFromImage( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); + break; + case "pdf": + response = await inferTagsFromPDF( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); + break; + default: + throw new Error(`[inference][${jobId}] Unsupported bookmark type`); + } + } else { + throw new Error(`[inference][${jobId}] Unsupported bookmark type`); + } + + if (!response) { + throw new Error(`[inference][${jobId}] Inference response is empty`); + } + + try { + let tags = openAIResponseSchema.parse(JSON.parse(response.response)).tags; + logger.info( + `[inference][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${response.totalTokens} tokens and inferred: ${tags}`, + ); + + // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. + // Additionally, trim the tags to prevent whitespaces at the beginning/the end of the tag. + tags = tags.map((t) => { + let tag = t; + if (tag.startsWith("#")) { + tag = t.slice(1); + } + return tag.trim(); + }); + + return tags; + } catch (e) { + const responseSneak = response.response.substring(0, 20); + throw new Error( + `[inference][${jobId}] The model ignored our prompt and didn't respond with the expected JSON: ${JSON.stringify(e)}. Here's a sneak peak from the response: ${responseSneak}`, + ); + } +} + +async function connectTags( + bookmarkId: string, + inferredTags: string[], + userId: string, +) { + if (inferredTags.length == 0) { + return; + } + + await db.transaction(async (tx) => { + // Attempt to match exiting tags with the new ones + const { matchedTagIds, notFoundTagNames } = await (async () => { + const { normalizeTag, sql: normalizedTagSql } = tagNormalizer( + bookmarkTags.name, + ); + const normalizedInferredTags = inferredTags.map((t) => ({ + originalTag: t, + normalizedTag: normalizeTag(t), + })); + + const matchedTags = await tx.query.bookmarkTags.findMany({ + where: and( + eq(bookmarkTags.userId, userId), + inArray( + normalizedTagSql, + normalizedInferredTags.map((t) => t.normalizedTag), + ), + ), + }); + + const matchedTagIds = matchedTags.map((r) => r.id); + const notFoundTagNames = normalizedInferredTags + .filter( + (t) => + !matchedTags.some( + (mt) => normalizeTag(mt.name) === t.normalizedTag, + ), + ) + .map((t) => t.originalTag); + + return { matchedTagIds, notFoundTagNames }; + })(); + + // Create tags that didn't exist previously + let newTagIds: string[] = []; + if (notFoundTagNames.length > 0) { + newTagIds = ( + await tx + .insert(bookmarkTags) + .values( + notFoundTagNames.map((t) => ({ + name: t, + userId, + })), + ) + .onConflictDoNothing() + .returning() + ).map((t) => t.id); + } + + // Delete old AI tags + const detachedTags = await tx + .delete(tagsOnBookmarks) + .where( + and( + eq(tagsOnBookmarks.attachedBy, "ai"), + eq(tagsOnBookmarks.bookmarkId, bookmarkId), + ), + ) + .returning(); + + const allTagIds = new Set([...matchedTagIds, ...newTagIds]); + + // Attach new ones + const attachedTags = await tx + .insert(tagsOnBookmarks) + .values( + [...allTagIds].map((tagId) => ({ + tagId, + bookmarkId, + attachedBy: "ai" as const, + })), + ) + .onConflictDoNothing() + .returning(); + + await triggerRuleEngineOnEvent(bookmarkId, [ + ...detachedTags.map((t) => ({ + type: "tagRemoved" as const, + tagId: t.tagId, + })), + ...attachedTags.map((t) => ({ + type: "tagAdded" as const, + tagId: t.tagId, + })), + ]); + }); +} + +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + asset: true, + }, + }); +} + +export async function runTagging( + bookmarkId: string, + job: DequeuedJob, + inferenceClient: InferenceClient, +) { + if (!serverConfig.inference.enableAutoTagging) { + logger.info( + `[inference][${job.id}] Skipping tagging job for bookmark with id "${bookmarkId}" because it's disabled in the config.`, + ); + return; + } + const jobId = job.id; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[inference][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + logger.info( + `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, + ); + + const tags = await inferTags( + jobId, + bookmark, + inferenceClient, + job.abortSignal, + ); + + await connectTags(bookmarkId, tags, bookmark.userId); + + // Trigger a webhook + await triggerWebhook(bookmarkId, "ai tagged"); + + // Update the search index + await triggerSearchReindex(bookmarkId); +} diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts new file mode 100644 index 00000000..427cc383 --- /dev/null +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -0,0 +1,86 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import { buildImpersonatingAuthedContext } from "trpc"; + +import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { + RuleEngineQueue, + zRuleEngineRequestSchema, +} from "@karakeep/shared/queues"; +import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; + +export class RuleEngineWorker { + static build() { + logger.info("Starting rule engine worker ..."); + const worker = new Runner( + RuleEngineQueue, + { + run: runRuleEngine, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[ruleEngine][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 10, + validator: zRuleEngineRequestSchema, + }, + ); + + return worker; + } +} + +async function getBookmarkUserId(bookmarkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { + userId: true, + }, + }); +} + +async function runRuleEngine(job: DequeuedJob) { + const jobId = job.id; + const { bookmarkId, events } = job.data; + + const bookmark = await getBookmarkUserId(bookmarkId); + if (!bookmark) { + throw new Error( + `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + const userId = bookmark.userId; + const authedCtx = await buildImpersonatingAuthedContext(userId); + + const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); + + const results = ( + await Promise.all(events.map((event) => ruleEngine.onEvent(event))) + ).flat(); + + if (results.length == 0) { + return; + } + + const message = results + .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`) + .join("\n"); + + logger.info( + `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`, + ); +} diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts new file mode 100644 index 00000000..e7b827a9 --- /dev/null +++ b/apps/workers/workers/searchWorker.ts @@ -0,0 +1,156 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { + SearchIndexingQueue, + zSearchIndexingRequestSchema, +} from "@karakeep/shared/queues"; +import { getSearchIdxClient } from "@karakeep/shared/search"; + +export class SearchIndexingWorker { + static build() { + logger.info("Starting search indexing worker ..."); + const worker = new Runner( + SearchIndexingQueue, + { + run: runSearchIndexing, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function ensureTaskSuccess( + searchClient: NonNullable>>, + taskUid: number, +) { + const task = await searchClient.waitForTask(taskUid); + if (task.error) { + throw new Error(`Search task failed: ${task.error.message}`); + } +} + +async function runIndex( + searchClient: NonNullable>>, + bookmarkId: string, +) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: true, + text: true, + asset: true, + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark ${bookmarkId} not found`); + } + + const task = await searchClient.addDocuments( + [ + { + id: bookmark.id, + userId: bookmark.userId, + ...(bookmark.link + ? { + url: bookmark.link.url, + linkTitle: bookmark.link.title, + description: bookmark.link.description, + content: bookmark.link.content, + publisher: bookmark.link.publisher, + author: bookmark.link.author, + datePublished: bookmark.link.datePublished, + dateModified: bookmark.link.dateModified, + } + : undefined), + ...(bookmark.asset + ? { + content: bookmark.asset.content, + metadata: bookmark.asset.metadata, + } + : undefined), + ...(bookmark.text ? { content: bookmark.text.text } : undefined), + note: bookmark.note, + summary: bookmark.summary, + title: bookmark.title, + createdAt: bookmark.createdAt.toISOString(), + tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), + }, + ], + { + primaryKey: "id", + }, + ); + await ensureTaskSuccess(searchClient, task.taskUid); +} + +async function runDelete( + searchClient: NonNullable>>, + bookmarkId: string, +) { + const task = await searchClient.deleteDocument(bookmarkId); + await ensureTaskSuccess(searchClient, task.taskUid); +} + +async function runSearchIndexing(job: DequeuedJob) { + const jobId = job.id; + + const request = zSearchIndexingRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const searchClient = await getSearchIdxClient(); + if (!searchClient) { + logger.debug( + `[search][${jobId}] Search is not configured, nothing to do now`, + ); + return; + } + + const bookmarkId = request.data.bookmarkId; + logger.info( + `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`, + ); + + switch (request.data.type) { + case "index": { + await runIndex(searchClient, bookmarkId); + break; + } + case "delete": { + await runDelete(searchClient, bookmarkId); + break; + } + } +} diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts new file mode 100644 index 00000000..d4c8abdb --- /dev/null +++ b/apps/workers/workers/tidyAssetsWorker.ts @@ -0,0 +1,107 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import { db } from "@karakeep/db"; +import { assets } from "@karakeep/db/schema"; +import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { + TidyAssetsQueue, + ZTidyAssetsRequest, + zTidyAssetsRequestSchema, +} from "@karakeep/shared/queues"; + +export class TidyAssetsWorker { + static build() { + logger.info("Starting tidy assets worker ..."); + const worker = new Runner( + TidyAssetsQueue, + { + run: runTidyAssets, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[tidyAssets][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function handleAsset( + asset: { + assetId: string; + userId: string; + size: number; + contentType: string; + fileName?: string | null; + }, + request: ZTidyAssetsRequest, + jobId: string, +) { + const dbRow = await db.query.assets.findFirst({ + where: eq(assets.id, asset.assetId), + }); + if (!dbRow) { + if (request.cleanDanglingAssets) { + await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); + logger.info( + `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, + ); + } else { + logger.warn( + `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, + ); + } + return; + } + + if (request.syncAssetMetadata) { + await db + .update(assets) + .set({ + contentType: asset.contentType, + fileName: asset.fileName, + size: asset.size, + }) + .where(eq(assets.id, asset.assetId)); + logger.info( + `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, + ); + } +} + +async function runTidyAssets(job: DequeuedJob) { + const jobId = job.id; + + const request = zTidyAssetsRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + for await (const asset of getAllAssets()) { + try { + handleAsset(asset, request.data, jobId); + } catch (e) { + logger.error( + `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, + ); + } + } +} diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts new file mode 100644 index 00000000..3fa3e49e --- /dev/null +++ b/apps/workers/workers/videoWorker.ts @@ -0,0 +1,214 @@ +import fs from "fs"; +import * as os from "os"; +import path from "path"; +import { execa } from "execa"; +import { DequeuedJob, Runner } from "liteque"; + +import { db } from "@karakeep/db"; +import { AssetTypes } from "@karakeep/db/schema"; +import { + ASSET_TYPES, + getAssetSize, + newAssetId, + saveAssetFromFile, + silentDeleteAsset, +} from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + VideoWorkerQueue, + ZVideoRequest, + zvideoRequestSchema, +} from "@karakeep/shared/queues"; + +import { withTimeout } from "../utils"; +import { getBookmarkDetails, updateAsset } from "../workerUtils"; + +const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads"); + +export class VideoWorker { + static build() { + logger.info("Starting video worker ..."); + + return new Runner( + VideoWorkerQueue, + { + run: withTimeout( + runWorker, + /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout, + ), + onComplete: async (job) => { + const jobId = job.id; + logger.info( + `[VideoCrawler][${jobId}] Video Download Completed successfully`, + ); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`, + ); + return Promise.resolve(); + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.downloadVideoTimeout, + concurrency: 1, + validator: zvideoRequestSchema, + }, + ); + } +} + +function prepareYtDlpArguments(url: string, assetPath: string) { + const ytDlpArguments = [url]; + if (serverConfig.crawler.maxVideoDownloadSize > 0) { + ytDlpArguments.push( + "-f", + `best[filesize<${serverConfig.crawler.maxVideoDownloadSize}M]`, + ); + } + + ytDlpArguments.push(...serverConfig.crawler.ytDlpArguments); + ytDlpArguments.push("-o", assetPath); + ytDlpArguments.push("--no-playlist"); + return ytDlpArguments; +} + +async function runWorker(job: DequeuedJob) { + const jobId = job.id; + const { bookmarkId } = job.data; + + const { + url, + userId, + videoAssetId: oldVideoAssetId, + } = await getBookmarkDetails(bookmarkId); + + if (!serverConfig.crawler.downloadVideo) { + logger.info( + `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it is disabled in the config.`, + ); + return; + } + + const videoAssetId = newAssetId(); + let assetPath = `${TMP_FOLDER}/${videoAssetId}`; + await fs.promises.mkdir(TMP_FOLDER, { recursive: true }); + + const ytDlpArguments = prepareYtDlpArguments(url, assetPath); + + try { + logger.info( + `[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`, + ); + + await execa("yt-dlp", ytDlpArguments, { + cancelSignal: job.abortSignal, + }); + const downloadPath = await findAssetFile(videoAssetId); + if (!downloadPath) { + logger.info( + "[VideoCrawler][${jobId}] yt-dlp didn't download anything. Skipping ...", + ); + return; + } + assetPath = downloadPath; + } catch (e) { + const err = e as Error; + if ( + err.message.includes("ERROR: Unsupported URL:") || + err.message.includes("No media found") + ) { + logger.info( + `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it's not one of the supported yt-dlp URLs`, + ); + return; + } + logger.error( + `[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`, + ); + await deleteLeftOverAssetFile(jobId, videoAssetId); + return; + } + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`, + ); + await saveAssetFromFile({ + userId, + assetId: videoAssetId, + assetPath, + metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, + }); + + await db.transaction(async (txn) => { + await updateAsset( + oldVideoAssetId, + { + id: videoAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_VIDEO, + contentType: ASSET_TYPES.VIDEO_MP4, + size: await getAssetSize({ userId, assetId: videoAssetId }), + }, + txn, + ); + }); + await silentDeleteAsset(userId, oldVideoAssetId); + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, + ); +} + +/** + * Deletes leftover assets in case the download fails + * + * @param jobId the id of the job + * @param assetId the id of the asset to delete + */ +async function deleteLeftOverAssetFile( + jobId: string, + assetId: string, +): Promise { + let assetFile; + try { + assetFile = await findAssetFile(assetId); + } catch { + // ignore exception, no asset file was found + return; + } + if (!assetFile) { + return; + } + logger.info( + `[VideoCrawler][${jobId}] Deleting leftover video asset "${assetFile}".`, + ); + try { + await fs.promises.rm(assetFile); + } catch (e) { + logger.error( + `[VideoCrawler][${jobId}] Failed deleting leftover video asset "${assetFile}".`, + ); + } +} + +/** + * yt-dlp automatically adds a file ending to the passed in filename --> we have to search it again in the folder + * + * @param assetId the id of the asset to search + * @returns the path to the downloaded asset + */ +async function findAssetFile(assetId: string): Promise { + const files = await fs.promises.readdir(TMP_FOLDER); + for (const file of files) { + if (file.startsWith(assetId)) { + return path.join(TMP_FOLDER, file); + } + } + return null; +} diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts new file mode 100644 index 00000000..9d3ed2c1 --- /dev/null +++ b/apps/workers/workers/webhookWorker.ts @@ -0,0 +1,146 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import fetch from "node-fetch"; + +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + WebhookQueue, + ZWebhookRequest, + zWebhookRequestSchema, +} from "@karakeep/shared/queues"; + +export class WebhookWorker { + static build() { + logger.info("Starting webhook worker ..."); + const worker = new Runner( + WebhookQueue, + { + run: runWebhook, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[webhook][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: + serverConfig.webhook.timeoutSec * + (serverConfig.webhook.retryTimes + 1) + + 1, //consider retry times, and timeout and add 1 second for other stuff + validator: zWebhookRequestSchema, + }, + ); + + return worker; + } +} + +async function fetchBookmark(bookmarkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: { + columns: { + url: true, + }, + }, + user: { + columns: {}, + with: { + webhooks: true, + }, + }, + }, + }); +} + +async function runWebhook(job: DequeuedJob) { + const jobId = job.id; + const webhookTimeoutSec = serverConfig.webhook.timeoutSec; + + const { bookmarkId } = job.data; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + if (!bookmark.user.webhooks) { + return; + } + + logger.info( + `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id} for operation "${job.data.operation}"`, + ); + + await Promise.allSettled( + bookmark.user.webhooks + .filter((w) => w.events.includes(job.data.operation)) + .map(async (webhook) => { + const url = webhook.url; + const webhookToken = webhook.token; + const maxRetries = serverConfig.webhook.retryTimes; + let attempt = 0; + let success = false; + + while (attempt < maxRetries && !success) { + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(webhookToken + ? { + Authorization: `Bearer ${webhookToken}`, + } + : {}), + }, + body: JSON.stringify({ + jobId, + bookmarkId, + userId: bookmark.userId, + url: bookmark.link ? bookmark.link.url : undefined, + type: bookmark.type, + operation: job.data.operation, + }), + signal: AbortSignal.timeout(webhookTimeoutSec * 1000), + }); + + if (!response.ok) { + logger.error( + `Webhook call to ${url} failed with status: ${response.status}`, + ); + } else { + logger.info( + `[webhook][${jobId}] Webhook to ${url} call succeeded`, + ); + success = true; + } + } catch (error) { + logger.error( + `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, + ); + } + attempt++; + if (!success && attempt < maxRetries) { + logger.info( + `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, + ); + } + } + }), + ); +} -- cgit v1.2.3-70-g09d2