From 2743d9e38ecfdbf757d4d2f97bcf09d601245b59 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 18 May 2025 20:22:59 +0000 Subject: feat: Add AI auto summarization. Fixes #1163 --- apps/web/components/admin/BackgroundJobs.tsx | 24 +- apps/web/lib/i18n/locales/en/translation.json | 2 + apps/workers/assetPreprocessingWorker.ts | 338 ---- apps/workers/crawlerWorker.ts | 877 --------- apps/workers/feedWorker.ts | 215 --- apps/workers/index.ts | 27 +- apps/workers/openaiWorker.ts | 463 ----- apps/workers/ruleEngineWorker.ts | 86 - apps/workers/searchWorker.ts | 156 -- apps/workers/tidyAssetsWorker.ts | 107 -- apps/workers/videoWorker.ts | 214 --- apps/workers/webhookWorker.ts | 146 -- apps/workers/workers/assetPreprocessingWorker.ts | 339 ++++ apps/workers/workers/crawlerWorker.ts | 882 +++++++++ apps/workers/workers/feedWorker.ts | 215 +++ apps/workers/workers/inference/inferenceWorker.ts | 100 + apps/workers/workers/inference/summarize.ts | 123 ++ apps/workers/workers/inference/tagging.ts | 399 ++++ apps/workers/workers/ruleEngineWorker.ts | 86 + apps/workers/workers/searchWorker.ts | 156 ++ apps/workers/workers/tidyAssetsWorker.ts | 107 ++ apps/workers/workers/videoWorker.ts | 214 +++ apps/workers/workers/webhookWorker.ts | 146 ++ docs/docs/03-configuration.md | 22 +- .../db/drizzle/0047_add_summarization_status.sql | 1 + packages/db/drizzle/meta/0047_snapshot.json | 1967 ++++++++++++++++++++ packages/db/drizzle/meta/_journal.json | 7 + packages/db/schema.ts | 3 + packages/shared-react/utils/bookmarkUtils.ts | 13 +- packages/shared/config.ts | 4 + packages/shared/queues.ts | 5 +- packages/shared/types/bookmarks.ts | 1 + packages/trpc/routers/admin.ts | 36 +- packages/trpc/routers/bookmarks.ts | 1 + 34 files changed, 4843 insertions(+), 2639 deletions(-) delete mode 100644 apps/workers/assetPreprocessingWorker.ts delete mode 100644 apps/workers/crawlerWorker.ts delete mode 100644 apps/workers/feedWorker.ts delete mode 100644 apps/workers/openaiWorker.ts delete mode 100644 apps/workers/ruleEngineWorker.ts delete mode 100644 apps/workers/searchWorker.ts delete mode 100644 apps/workers/tidyAssetsWorker.ts delete mode 100644 apps/workers/videoWorker.ts delete mode 100644 apps/workers/webhookWorker.ts create mode 100644 apps/workers/workers/assetPreprocessingWorker.ts create mode 100644 apps/workers/workers/crawlerWorker.ts create mode 100644 apps/workers/workers/feedWorker.ts create mode 100644 apps/workers/workers/inference/inferenceWorker.ts create mode 100644 apps/workers/workers/inference/summarize.ts create mode 100644 apps/workers/workers/inference/tagging.ts create mode 100644 apps/workers/workers/ruleEngineWorker.ts create mode 100644 apps/workers/workers/searchWorker.ts create mode 100644 apps/workers/workers/tidyAssetsWorker.ts create mode 100644 apps/workers/workers/videoWorker.ts create mode 100644 apps/workers/workers/webhookWorker.ts create mode 100644 packages/db/drizzle/0047_add_summarization_status.sql create mode 100644 packages/db/drizzle/meta/0047_snapshot.json diff --git a/apps/web/components/admin/BackgroundJobs.tsx b/apps/web/components/admin/BackgroundJobs.tsx index 217e2ad9..ac5885ef 100644 --- a/apps/web/components/admin/BackgroundJobs.tsx +++ b/apps/web/components/admin/BackgroundJobs.tsx @@ -127,7 +127,7 @@ function AdminActions() { variant="destructive" loading={isInferencePending} onClick={() => - reRunInferenceOnAllBookmarks({ taggingStatus: "failure" }) + reRunInferenceOnAllBookmarks({ type: "tag", status: "failure" }) } > {t("admin.actions.regenerate_ai_tags_for_failed_bookmarks_only")} @@ -135,10 +135,30 @@ function AdminActions() { reRunInferenceOnAllBookmarks({ taggingStatus: "all" })} + onClick={() => + reRunInferenceOnAllBookmarks({ type: "tag", status: "all" }) + } > {t("admin.actions.regenerate_ai_tags_for_all_bookmarks")} + + reRunInferenceOnAllBookmarks({ type: "summarize", status: "failure" }) + } + > + {t("admin.actions.regenerate_ai_summaries_for_failed_bookmarks_only")} + + + reRunInferenceOnAllBookmarks({ type: "summarize", status: "all" }) + } + > + {t("admin.actions.regenerate_ai_summaries_for_all_bookmarks")} + ( - AssetPreprocessingQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function readImageText(buffer: Buffer) { - if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { - return null; - } - const worker = await createWorker(serverConfig.ocr.langs, undefined, { - cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), - }); - try { - const ret = await worker.recognize(buffer); - if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { - return null; - } - return ret.data.text; - } finally { - await worker.terminate(); - } -} - -async function readPDFText(buffer: Buffer): Promise<{ - text: string; - metadata: Record; -}> { - return new Promise((resolve, reject) => { - const pdfParser = new PDFParser(null, true); - pdfParser.on("pdfParser_dataError", reject); - pdfParser.on("pdfParser_dataReady", (pdfData) => { - resolve({ - text: pdfParser.getRawTextContent(), - metadata: pdfData.Meta, - }); - }); - pdfParser.parseBuffer(buffer); - }); -} - -export async function extractAndSavePDFScreenshot( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasScreenshot = - bookmark.assets.find( - (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, - ) !== undefined; - if (alreadyHasScreenshot && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, - ); - try { - /** - * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL - * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md - */ - const screenshot = await fromBuffer(asset, { - density: 100, - quality: 100, - format: "png", - preserveAspectRatio: true, - })(1, { responseType: "buffer" }); - - if (!screenshot.buffer) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, - ); - return false; - } - - // Store the screenshot - const assetId = newAssetId(); - const fileName = "screenshot.png"; - const contentType = "image/png"; - await saveAsset({ - userId: bookmark.userId, - assetId, - asset: screenshot.buffer, - metadata: { - contentType, - fileName, - }, - }); - - // Insert into database - await db.insert(assets).values({ - id: assetId, - bookmarkId: bookmark.id, - userId: bookmark.userId, - assetType: AssetTypes.ASSET_SCREENSHOT, - contentType, - size: screenshot.buffer.byteLength, - fileName, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, - ); - return true; - } catch (error) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, - ); - return false; - } -} - -async function extractAndSaveImageText( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, - ); - return false; - } - } - let imageText = null; - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, - ); - try { - imageText = await readImageText(asset); - } catch (e) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, - ); - } - if (!imageText) { - return false; - } - - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: imageText, - metadata: null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function extractAndSavePDFText( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, - ); - const pdfParse = await readPDFText(asset); - if (!pdfParse?.text) { - throw new Error( - `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, - ); - } - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: pdfParse.text, - metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function getBookmark(bookmarkId: string) { - return db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); -} - -async function run(req: DequeuedJob) { - const isFixMode = req.data.fixMode; - const jobId = req.id; - const bookmarkId = req.data.bookmarkId; - - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, - ); - - if (!bookmark) { - throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); - } - - if (!bookmark.asset) { - throw new Error( - `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, - ); - } - - const { asset } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - - if (!asset) { - throw new Error( - `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, - ); - } - - let anythingChanged = false; - switch (bookmark.asset.assetType) { - case "image": { - const extractedText = await extractAndSaveImageText( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText; - break; - } - case "pdf": { - const extractedText = await extractAndSavePDFText( - jobId, - asset, - bookmark, - isFixMode, - ); - const extractedScreenshot = await extractAndSavePDFScreenshot( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText || extractedScreenshot; - break; - } - default: - throw new Error( - `[assetPreprocessing][${jobId}] Unsupported bookmark type`, - ); - } - - if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - - // Update the search index - await triggerSearchReindex(bookmarkId); - } -} diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts deleted file mode 100644 index a40cbe53..00000000 --- a/apps/workers/crawlerWorker.ts +++ /dev/null @@ -1,877 +0,0 @@ -import * as dns from "dns"; -import { promises as fs } from "fs"; -import * as path from "node:path"; -import * as os from "os"; -import type { Browser } from "puppeteer"; -import { PuppeteerBlocker } from "@ghostery/adblocker-puppeteer"; -import { Readability } from "@mozilla/readability"; -import { Mutex } from "async-mutex"; -import DOMPurify from "dompurify"; -import { eq } from "drizzle-orm"; -import { execa } from "execa"; -import { isShuttingDown } from "exit"; -import { JSDOM } from "jsdom"; -import { DequeuedJob, Runner } from "liteque"; -import metascraper from "metascraper"; -import metascraperAmazon from "metascraper-amazon"; -import metascraperAuthor from "metascraper-author"; -import metascraperDate from "metascraper-date"; -import metascraperDescription from "metascraper-description"; -import metascraperImage from "metascraper-image"; -import metascraperLogo from "metascraper-logo-favicon"; -import metascraperPublisher from "metascraper-publisher"; -import metascraperReadability from "metascraper-readability"; -import metascraperTitle from "metascraper-title"; -import metascraperTwitter from "metascraper-twitter"; -import metascraperUrl from "metascraper-url"; -import fetch from "node-fetch"; -import puppeteer from "puppeteer-extra"; -import StealthPlugin from "puppeteer-extra-plugin-stealth"; -import { withTimeout } from "utils"; -import { getBookmarkDetails, updateAsset } from "workerUtils"; - -import type { ZCrawlLinkRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - assets, - AssetTypes, - bookmarkAssets, - bookmarkLinks, - bookmarks, -} from "@karakeep/db/schema"; -import { - ASSET_TYPES, - getAssetSize, - IMAGE_ASSET_TYPES, - newAssetId, - readAsset, - saveAsset, - saveAssetFromFile, - silentDeleteAsset, - SUPPORTED_UPLOAD_ASSET_TYPES, -} from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - AssetPreprocessingQueue, - LinkCrawlerQueue, - OpenAIQueue, - triggerSearchReindex, - triggerVideoWorker, - triggerWebhook, - zCrawlLinkRequestSchema, -} from "@karakeep/shared/queues"; -import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; - -const metascraperParser = metascraper([ - metascraperDate({ - dateModified: true, - datePublished: true, - }), - metascraperAmazon(), - metascraperReadability(), - metascraperAuthor(), - metascraperPublisher(), - metascraperTitle(), - metascraperDescription(), - metascraperTwitter(), - metascraperImage(), - metascraperLogo(), - metascraperUrl(), -]); - -let globalBrowser: Browser | undefined; -let globalBlocker: PuppeteerBlocker | undefined; -// Guards the interactions with the browser instance. -// This is needed given that most of the browser APIs are async. -const browserMutex = new Mutex(); - -async function startBrowserInstance() { - const defaultViewport = { - width: 1440, - height: 900, - }; - if (serverConfig.crawler.browserWebSocketUrl) { - logger.info( - `[Crawler] Connecting to existing browser websocket address: ${serverConfig.crawler.browserWebSocketUrl}`, - ); - return puppeteer.connect({ - browserWSEndpoint: serverConfig.crawler.browserWebSocketUrl, - defaultViewport, - }); - } else if (serverConfig.crawler.browserWebUrl) { - logger.info( - `[Crawler] Connecting to existing browser instance: ${serverConfig.crawler.browserWebUrl}`, - ); - const webUrl = new URL(serverConfig.crawler.browserWebUrl); - // We need to resolve the ip address as a workaround for https://github.com/puppeteer/puppeteer/issues/2242 - const { address: address } = await dns.promises.lookup(webUrl.hostname); - webUrl.hostname = address; - logger.info( - `[Crawler] Successfully resolved IP address, new address: ${webUrl.toString()}`, - ); - return puppeteer.connect({ - browserURL: webUrl.toString(), - defaultViewport, - }); - } else { - logger.info(`Running in browserless mode`); - return undefined; - } -} - -async function launchBrowser() { - globalBrowser = undefined; - await browserMutex.runExclusive(async () => { - try { - globalBrowser = await startBrowserInstance(); - } catch (e) { - logger.error( - `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${(e as Error).stack}`, - ); - if (isShuttingDown) { - logger.info("[Crawler] We're shutting down so won't retry."); - return; - } - setTimeout(() => { - launchBrowser(); - }, 5000); - return; - } - globalBrowser?.on("disconnected", () => { - if (isShuttingDown) { - logger.info( - "[Crawler] The puppeteer browser got disconnected. But we're shutting down so won't restart it.", - ); - return; - } - logger.info( - "[Crawler] The puppeteer browser got disconnected. Will attempt to launch it again.", - ); - launchBrowser(); - }); - }); -} - -export class CrawlerWorker { - static async build() { - puppeteer.use(StealthPlugin()); - if (serverConfig.crawler.enableAdblocker) { - try { - logger.info("[crawler] Loading adblocker ..."); - globalBlocker = await PuppeteerBlocker.fromPrebuiltFull(fetch, { - path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), - read: fs.readFile, - write: fs.writeFile, - }); - } catch (e) { - logger.error( - `[crawler] Failed to load adblocker. Will not be blocking ads: ${e}`, - ); - } - } - if (!serverConfig.crawler.browserConnectOnDemand) { - await launchBrowser(); - } else { - logger.info( - "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", - ); - } - - logger.info("Starting crawler worker ..."); - const worker = new Runner( - LinkCrawlerQueue, - { - run: withTimeout( - runCrawler, - /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, - ), - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[Crawler][${jobId}] Completed successfully`); - const bookmarkId = job.data.bookmarkId; - if (bookmarkId) { - await changeBookmarkStatus(bookmarkId, "success"); - } - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`, - ); - const bookmarkId = job.data?.bookmarkId; - if (bookmarkId && job.numRetriesLeft == 0) { - await changeBookmarkStatus(bookmarkId, "failure"); - } - }, - }, - { - pollIntervalMs: 1000, - timeoutSecs: serverConfig.crawler.jobTimeoutSec, - concurrency: serverConfig.crawler.numWorkers, - }, - ); - - return worker; - } -} - -type DBAssetType = typeof assets.$inferInsert; - -async function changeBookmarkStatus( - bookmarkId: string, - crawlStatus: "success" | "failure", -) { - await db - .update(bookmarkLinks) - .set({ - crawlStatus, - }) - .where(eq(bookmarkLinks.id, bookmarkId)); -} - -/** - * This provides some "basic" protection from malicious URLs. However, all of those - * can be easily circumvented by pointing dns of origin to localhost, or with - * redirects. - */ -function validateUrl(url: string) { - const urlParsed = new URL(url); - if (urlParsed.protocol != "http:" && urlParsed.protocol != "https:") { - throw new Error(`Unsupported URL protocol: ${urlParsed.protocol}`); - } - - if (["localhost", "127.0.0.1", "0.0.0.0"].includes(urlParsed.hostname)) { - throw new Error(`Link hostname rejected: ${urlParsed.hostname}`); - } -} - -async function browserlessCrawlPage( - jobId: string, - url: string, - abortSignal: AbortSignal, -) { - logger.info( - `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, - ); - const response = await fetch(url, { - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - logger.info( - `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, - ); - return { - htmlContent: await response.text(), - statusCode: response.status, - screenshot: undefined, - url: response.url, - }; -} - -async function crawlPage( - jobId: string, - url: string, - abortSignal: AbortSignal, -): Promise<{ - htmlContent: string; - screenshot: Buffer | undefined; - statusCode: number; - url: string; -}> { - let browser: Browser | undefined; - if (serverConfig.crawler.browserConnectOnDemand) { - browser = await startBrowserInstance(); - } else { - browser = globalBrowser; - } - if (!browser) { - return browserlessCrawlPage(jobId, url, abortSignal); - } - const context = await browser.createBrowserContext(); - - try { - const page = await context.newPage(); - if (globalBlocker) { - await globalBlocker.enableBlockingInPage(page); - } - await page.setUserAgent( - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - ); - - const response = await page.goto(url, { - timeout: serverConfig.crawler.navigateTimeoutSec * 1000, - }); - logger.info( - `[Crawler][${jobId}] Successfully navigated to "${url}". Waiting for the page to load ...`, - ); - - // Wait until there's at most two connections for 2 seconds - // Attempt to wait only for 5 seconds - await Promise.race([ - page.waitForNetworkIdle({ - idleTime: 1000, // 1 sec - concurrency: 2, - }), - new Promise((f) => setTimeout(f, 5000)), - ]); - - logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); - - const htmlContent = await page.content(); - logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); - - let screenshot: Buffer | undefined = undefined; - if (serverConfig.crawler.storeScreenshot) { - try { - screenshot = await Promise.race([ - page.screenshot({ - // If you change this, you need to change the asset type in the store function. - type: "png", - encoding: "binary", - fullPage: serverConfig.crawler.fullPageScreenshot, - }), - new Promise((_, reject) => - setTimeout( - () => - reject( - "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", - ), - serverConfig.crawler.screenshotTimeoutSec * 1000, - ), - ), - ]); - logger.info( - `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, - ); - } catch (e) { - logger.warn( - `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${e}`, - ); - } - } - - return { - htmlContent, - statusCode: response?.status() ?? 0, - screenshot, - url: page.url(), - }; - } finally { - await context.close(); - if (serverConfig.crawler.browserConnectOnDemand) { - await browser.close(); - } - } -} - -async function extractMetadata( - htmlContent: string, - url: string, - jobId: string, -) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, - ); - const meta = await metascraperParser({ - url, - html: htmlContent, - // We don't want to validate the URL again as we've already done it by visiting the page. - // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). - validateUrl: false, - }); - logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`); - return meta; -} - -function extractReadableContent( - htmlContent: string, - url: string, - jobId: string, -) { - logger.info( - `[Crawler][${jobId}] Will attempt to extract readable content ...`, - ); - const dom = new JSDOM(htmlContent, { url }); - const readableContent = new Readability(dom.window.document).parse(); - if (!readableContent || typeof readableContent.content !== "string") { - return null; - } - - const window = new JSDOM("").window; - const purify = DOMPurify(window); - const purifiedHTML = purify.sanitize(readableContent.content); - - logger.info(`[Crawler][${jobId}] Done extracting readable content.`); - return { - content: purifiedHTML, - textContent: readableContent.textContent, - }; -} - -async function storeScreenshot( - screenshot: Buffer | undefined, - userId: string, - jobId: string, -) { - if (!serverConfig.crawler.storeScreenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, - ); - return null; - } - if (!screenshot) { - logger.info( - `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, - ); - return null; - } - const assetId = newAssetId(); - const contentType = "image/png"; - const fileName = "screenshot.png"; - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: screenshot, - }); - logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, - ); - return { assetId, contentType, fileName, size: screenshot.byteLength }; -} - -async function downloadAndStoreFile( - url: string, - userId: string, - jobId: string, - fileType: string, - abortSignal: AbortSignal, -) { - try { - logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`); - const response = await fetch(url, { - signal: abortSignal, - }); - if (!response.ok) { - throw new Error(`Failed to download ${fileType}: ${response.status}`); - } - const buffer = await response.arrayBuffer(); - const assetId = newAssetId(); - - const contentType = response.headers.get("content-type"); - if (!contentType) { - throw new Error("No content type in the response"); - } - - await saveAsset({ - userId, - assetId, - metadata: { contentType }, - asset: Buffer.from(buffer), - }); - - logger.info( - `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId}`, - ); - - return { assetId, userId, contentType, size: buffer.byteLength }; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, - ); - return null; - } -} - -async function downloadAndStoreImage( - url: string, - userId: string, - jobId: string, - abortSignal: AbortSignal, -) { - if (!serverConfig.crawler.downloadBannerImage) { - logger.info( - `[Crawler][${jobId}] Skipping downloading the image as per the config.`, - ); - return null; - } - return downloadAndStoreFile(url, userId, jobId, "image", abortSignal); -} - -async function archiveWebpage( - html: string, - url: string, - userId: string, - jobId: string, - abortSignal: AbortSignal, -) { - logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); - const assetId = newAssetId(); - const assetPath = `/tmp/${assetId}`; - - await execa({ - input: html, - cancelSignal: abortSignal, - })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); - - const contentType = "text/html"; - - await saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata: { - contentType, - }, - }); - - logger.info( - `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, - ); - - return { - assetId, - contentType, - size: await getAssetSize({ userId, assetId }), - }; -} - -async function getContentType( - url: string, - jobId: string, - abortSignal: AbortSignal, -): Promise { - try { - logger.info( - `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, - ); - const response = await fetch(url, { - method: "HEAD", - signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), - }); - const contentType = response.headers.get("content-type"); - logger.info( - `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, - ); - return contentType; - } catch (e) { - logger.error( - `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, - ); - return null; - } -} - -/** - * Downloads the asset from the URL and transforms the linkBookmark to an assetBookmark - * @param url the url the user provided - * @param assetType the type of the asset we're downloading - * @param userId the id of the user - * @param jobId the id of the job for logging - * @param bookmarkId the id of the bookmark - */ -async function handleAsAssetBookmark( - url: string, - assetType: "image" | "pdf", - userId: string, - jobId: string, - bookmarkId: string, - abortSignal: AbortSignal, -) { - const downloaded = await downloadAndStoreFile( - url, - userId, - jobId, - assetType, - abortSignal, - ); - if (!downloaded) { - return; - } - const fileName = path.basename(new URL(url).pathname); - await db.transaction(async (trx) => { - await updateAsset( - undefined, - { - id: downloaded.assetId, - bookmarkId, - userId, - assetType: AssetTypes.BOOKMARK_ASSET, - contentType: downloaded.contentType, - size: downloaded.size, - fileName, - }, - trx, - ); - await trx.insert(bookmarkAssets).values({ - id: bookmarkId, - assetType, - assetId: downloaded.assetId, - content: null, - fileName, - sourceUrl: url, - }); - // Switch the type of the bookmark from LINK to ASSET - await trx - .update(bookmarks) - .set({ type: BookmarkTypes.ASSET }) - .where(eq(bookmarks.id, bookmarkId)); - await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); - }); - await AssetPreprocessingQueue.enqueue({ - bookmarkId, - fixMode: false, - }); -} - -async function crawlAndParseUrl( - url: string, - userId: string, - jobId: string, - bookmarkId: string, - oldScreenshotAssetId: string | undefined, - oldImageAssetId: string | undefined, - oldFullPageArchiveAssetId: string | undefined, - precrawledArchiveAssetId: string | undefined, - archiveFullPage: boolean, - abortSignal: AbortSignal, -) { - let result: { - htmlContent: string; - screenshot: Buffer | undefined; - statusCode: number | null; - url: string; - }; - - if (precrawledArchiveAssetId) { - logger.info( - `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, - ); - const asset = await readAsset({ - userId, - assetId: precrawledArchiveAssetId, - }); - result = { - htmlContent: asset.asset.toString(), - screenshot: undefined, - statusCode: 200, - url, - }; - } else { - result = await crawlPage(jobId, url, abortSignal); - } - abortSignal.throwIfAborted(); - - const { htmlContent, screenshot, statusCode, url: browserUrl } = result; - - const [meta, readableContent, screenshotAssetInfo] = await Promise.all([ - extractMetadata(htmlContent, browserUrl, jobId), - extractReadableContent(htmlContent, browserUrl, jobId), - storeScreenshot(screenshot, userId, jobId), - ]); - abortSignal.throwIfAborted(); - let imageAssetInfo: DBAssetType | null = null; - if (meta.image) { - const downloaded = await downloadAndStoreImage( - meta.image, - userId, - jobId, - abortSignal, - ); - if (downloaded) { - imageAssetInfo = { - id: downloaded.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_BANNER_IMAGE, - contentType: downloaded.contentType, - size: downloaded.size, - }; - } - } - abortSignal.throwIfAborted(); - - const parseDate = (date: string | undefined) => { - if (!date) { - return null; - } - try { - return new Date(date); - } catch (_e) { - return null; - } - }; - - // TODO(important): Restrict the size of content to store - await db.transaction(async (txn) => { - await txn - .update(bookmarkLinks) - .set({ - title: meta.title, - description: meta.description, - // Don't store data URIs as they're not valid URLs and are usually quite large - imageUrl: meta.image?.startsWith("data:") ? null : meta.image, - favicon: meta.logo, - content: readableContent?.textContent, - htmlContent: readableContent?.content, - crawledAt: new Date(), - crawlStatusCode: statusCode, - author: meta.author, - publisher: meta.publisher, - datePublished: parseDate(meta.datePublished), - dateModified: parseDate(meta.dateModified), - }) - .where(eq(bookmarkLinks.id, bookmarkId)); - - if (screenshotAssetInfo) { - await updateAsset( - oldScreenshotAssetId, - { - id: screenshotAssetInfo.assetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_SCREENSHOT, - contentType: screenshotAssetInfo.contentType, - size: screenshotAssetInfo.size, - fileName: screenshotAssetInfo.fileName, - }, - txn, - ); - } - if (imageAssetInfo) { - await updateAsset(oldImageAssetId, imageAssetInfo, txn); - } - }); - - // Delete the old assets if any - await Promise.all([ - silentDeleteAsset(userId, oldScreenshotAssetId), - silentDeleteAsset(userId, oldImageAssetId), - ]); - - return async () => { - if ( - !precrawledArchiveAssetId && - (serverConfig.crawler.fullPageArchive || archiveFullPage) - ) { - const { - assetId: fullPageArchiveAssetId, - size, - contentType, - } = await archiveWebpage( - htmlContent, - browserUrl, - userId, - jobId, - abortSignal, - ); - - await db.transaction(async (txn) => { - await updateAsset( - oldFullPageArchiveAssetId, - { - id: fullPageArchiveAssetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, - contentType, - size, - fileName: null, - }, - txn, - ); - }); - if (oldFullPageArchiveAssetId) { - silentDeleteAsset(userId, oldFullPageArchiveAssetId); - } - } - }; -} - -async function runCrawler(job: DequeuedJob) { - const jobId = job.id ?? "unknown"; - - const request = zCrawlLinkRequestSchema.safeParse(job.data); - if (!request.success) { - logger.error( - `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - return; - } - - const { bookmarkId, archiveFullPage } = request.data; - const { - url, - userId, - screenshotAssetId: oldScreenshotAssetId, - imageAssetId: oldImageAssetId, - fullPageArchiveAssetId: oldFullPageArchiveAssetId, - precrawledArchiveAssetId, - } = await getBookmarkDetails(bookmarkId); - - logger.info( - `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, - ); - validateUrl(url); - - const contentType = await getContentType(url, jobId, job.abortSignal); - - // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage - const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; - - if (isPdf) { - await handleAsAssetBookmark( - url, - "pdf", - userId, - jobId, - bookmarkId, - job.abortSignal, - ); - } else if ( - contentType && - IMAGE_ASSET_TYPES.has(contentType) && - SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType) - ) { - await handleAsAssetBookmark( - url, - "image", - userId, - jobId, - bookmarkId, - job.abortSignal, - ); - } else { - const archivalLogic = await crawlAndParseUrl( - url, - userId, - jobId, - bookmarkId, - oldScreenshotAssetId, - oldImageAssetId, - oldFullPageArchiveAssetId, - precrawledArchiveAssetId, - archiveFullPage, - job.abortSignal, - ); - - // Enqueue openai job (if not set, assume it's true for backward compatibility) - if (job.data.runInference !== false) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - } - - // Update the search index - await triggerSearchReindex(bookmarkId); - - // Trigger a potential download of a video from the URL - await triggerVideoWorker(bookmarkId, url); - - // Trigger a webhook - await triggerWebhook(bookmarkId, "crawled"); - - // Do the archival as a separate last step as it has the potential for failure - await archivalLogic(); - } -} diff --git a/apps/workers/feedWorker.ts b/apps/workers/feedWorker.ts deleted file mode 100644 index 1eaba0c3..00000000 --- a/apps/workers/feedWorker.ts +++ /dev/null @@ -1,215 +0,0 @@ -import { and, eq, inArray } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import cron from "node-cron"; -import Parser from "rss-parser"; -import { buildImpersonatingTRPCClient } from "trpc"; -import { z } from "zod"; - -import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { FeedQueue } from "@karakeep/shared/queues"; -import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; - -export const FeedRefreshingWorker = cron.schedule( - "0 * * * *", - () => { - logger.info("[feed] Scheduling feed refreshing jobs ..."); - db.query.rssFeedsTable - .findMany({ - columns: { - id: true, - }, - where: eq(rssFeedsTable.enabled, true), - }) - .then((feeds) => { - for (const feed of feeds) { - FeedQueue.enqueue( - { - feedId: feed.id, - }, - { - idempotencyKey: feed.id, - }, - ); - } - }); - }, - { - runOnInit: false, - scheduled: false, - }, -); - -export class FeedWorker { - static build() { - logger.info("Starting feed worker ..."); - const worker = new Runner( - FeedQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[feed][${jobId}] Completed successfully`); - await db - .update(rssFeedsTable) - .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) - .where(eq(rssFeedsTable.id, job.data?.feedId)); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, - ); - if (job.data) { - await db - .update(rssFeedsTable) - .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) - .where(eq(rssFeedsTable.id, job.data?.feedId)); - } - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function run(req: DequeuedJob) { - const jobId = req.id; - const feed = await db.query.rssFeedsTable.findFirst({ - where: eq(rssFeedsTable.id, req.data.feedId), - }); - if (!feed) { - throw new Error( - `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, - ); - } - logger.info( - `[feed][${jobId}] Starting fetching feed "${feed.name}" (${feed.id}) ...`, - ); - - const response = await fetch(feed.url, { - signal: AbortSignal.timeout(5000), - headers: { - UserAgent: - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", - Accept: "application/rss+xml", - }, - }); - if (response.status !== 200) { - throw new Error( - `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) returned a non-success status: ${response.status}.`, - ); - } - const contentType = response.headers.get("content-type"); - if (!contentType || !contentType.includes("xml")) { - throw new Error( - `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) is not a valid RSS feed`, - ); - } - const xmlData = await response.text(); - - logger.info( - `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, - ); - - const parser = new Parser({ - customFields: { - item: ["id"], - }, - }); - const unparseFeedData = await parser.parseString(xmlData); - - // Apparently, we can't trust the output of the xml parser. So let's do our own type - // validation. - const feedItemsSchema = z.object({ - id: z.coerce.string(), - link: z.string().optional(), - guid: z.string().optional(), - }); - - const feedItems = unparseFeedData.items - .map((i) => feedItemsSchema.safeParse(i)) - .flatMap((i) => (i.success ? [i.data] : [])); - - logger.info( - `[feed][${jobId}] Found ${feedItems.length} entries in feed "${feed.name}" (${feed.id}) ...`, - ); - - if (feedItems.length === 0) { - logger.info(`[feed][${jobId}] No entries found.`); - return; - } - - // For feeds that don't have guids, use the link as the id - feedItems.forEach((item) => { - item.guid = item.guid ?? `${item.id}` ?? item.link; - }); - - const exitingEntries = await db.query.rssFeedImportsTable.findMany({ - where: and( - eq(rssFeedImportsTable.rssFeedId, feed.id), - inArray( - rssFeedImportsTable.entryId, - feedItems.map((item) => item.guid).filter((id): id is string => !!id), - ), - ), - }); - - const newEntries = feedItems.filter( - (item) => - !exitingEntries.some((entry) => entry.entryId === item.guid) && - item.link && - item.guid, - ); - - if (newEntries.length === 0) { - logger.info( - `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, - ); - return; - } - - logger.info( - `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, - ); - - const trpcClient = await buildImpersonatingTRPCClient(feed.userId); - - const createdBookmarks = await Promise.allSettled( - newEntries.map((item) => - trpcClient.bookmarks.createBookmark({ - type: BookmarkTypes.LINK, - url: item.link!, - }), - ), - ); - - // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. - await db - .insert(rssFeedImportsTable) - .values( - newEntries.map((item, idx) => { - const b = createdBookmarks[idx]; - return { - entryId: item.guid!, - bookmarkId: b.status === "fulfilled" ? b.value.id : null, - rssFeedId: feed.id, - }; - }), - ) - .onConflictDoNothing(); - - logger.info( - `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, - ); - - return Promise.resolve(); -} diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 208666c7..1cc1ce49 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,20 +1,19 @@ import "dotenv/config"; -import { AssetPreprocessingWorker } from "assetPreprocessingWorker"; -import { FeedRefreshingWorker, FeedWorker } from "feedWorker"; -import { RuleEngineWorker } from "ruleEngineWorker"; -import { TidyAssetsWorker } from "tidyAssetsWorker"; - import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { runQueueDBMigrations } from "@karakeep/shared/queues"; -import { CrawlerWorker } from "./crawlerWorker"; import { shutdownPromise } from "./exit"; -import { OpenAiWorker } from "./openaiWorker"; -import { SearchIndexingWorker } from "./searchWorker"; -import { VideoWorker } from "./videoWorker"; -import { WebhookWorker } from "./webhookWorker"; +import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; +import { CrawlerWorker } from "./workers/crawlerWorker"; +import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { OpenAiWorker } from "./workers/inference/inferenceWorker"; +import { RuleEngineWorker } from "./workers/ruleEngineWorker"; +import { SearchIndexingWorker } from "./workers/searchWorker"; +import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; +import { VideoWorker } from "./workers/videoWorker"; +import { WebhookWorker } from "./workers/webhookWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); @@ -22,7 +21,7 @@ async function main() { const [ crawler, - openai, + inference, search, tidyAssets, video, @@ -46,7 +45,7 @@ async function main() { await Promise.any([ Promise.all([ crawler.run(), - openai.run(), + inference.run(), search.run(), tidyAssets.run(), video.run(), @@ -58,12 +57,12 @@ async function main() { shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", + "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", ); FeedRefreshingWorker.stop(); crawler.stop(); - openai.stop(); + inference.stop(); search.stop(); tidyAssets.stop(); video.stop(); diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts deleted file mode 100644 index c8b2770e..00000000 --- a/apps/workers/openaiWorker.ts +++ /dev/null @@ -1,463 +0,0 @@ -import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import { buildImpersonatingTRPCClient } from "trpc"; -import { z } from "zod"; - -import type { InferenceClient } from "@karakeep/shared/inference"; -import type { ZOpenAIRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - bookmarks, - bookmarkTags, - customPrompts, - tagsOnBookmarks, -} from "@karakeep/db/schema"; -import { readAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import { InferenceClientFactory } from "@karakeep/shared/inference"; -import logger from "@karakeep/shared/logger"; -import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; -import { - OpenAIQueue, - triggerRuleEngineOnEvent, - triggerSearchReindex, - triggerWebhook, - zOpenAIRequestSchema, -} from "@karakeep/shared/queues"; - -const openAIResponseSchema = z.object({ - tags: z.array(z.string()), -}); - -function tagNormalizer(col: Column) { - function normalizeTag(tag: string) { - return tag.toLowerCase().replace(/[ \-_]/g, ""); - } - - return { - normalizeTag, - sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`, - }; -} - -async function attemptMarkTaggingStatus( - jobData: object | undefined, - status: "success" | "failure", -) { - if (!jobData) { - return; - } - try { - const request = zOpenAIRequestSchema.parse(jobData); - await db - .update(bookmarks) - .set({ - taggingStatus: status, - }) - .where(eq(bookmarks.id, request.bookmarkId)); - } catch (e) { - logger.error(`Something went wrong when marking the tagging status: ${e}`); - } -} - -export class OpenAiWorker { - static build() { - logger.info("Starting inference worker ..."); - const worker = new Runner( - OpenAIQueue, - { - run: runOpenAI, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[inference][${jobId}] Completed successfully`); - await attemptMarkTaggingStatus(job.data, "success"); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, - ); - if (job.numRetriesLeft == 0) { - await attemptMarkTaggingStatus(job?.data, "failure"); - } - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: serverConfig.inference.jobTimeoutSec, - }, - ); - - return worker; - } -} - -async function buildPrompt( - bookmark: NonNullable>>, -) { - const prompts = await fetchCustomPrompts(bookmark.userId, "text"); - if (bookmark.link) { - if (!bookmark.link.description && !bookmark.link.content) { - throw new Error( - `No content found for link "${bookmark.id}". Skipping ...`, - ); - } - - const content = bookmark.link.content; - return buildTextPrompt( - serverConfig.inference.inferredTagLang, - prompts, - `URL: ${bookmark.link.url} -Title: ${bookmark.link.title ?? ""} -Description: ${bookmark.link.description ?? ""} -Content: ${content ?? ""}`, - serverConfig.inference.contextLength, - ); - } - - if (bookmark.text) { - return buildTextPrompt( - serverConfig.inference.inferredTagLang, - prompts, - bookmark.text.text ?? "", - serverConfig.inference.contextLength, - ); - } - - throw new Error("Unknown bookmark type"); -} - -async function fetchBookmark(linkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, linkId), - with: { - link: true, - text: true, - asset: true, - }, - }); -} - -async function inferTagsFromImage( - jobId: string, - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - const { asset, metadata } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - - if (!asset) { - throw new Error( - `[inference][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmark.id} not found`, - ); - } - - const base64 = asset.toString("base64"); - return inferenceClient.inferFromImage( - buildImagePrompt( - serverConfig.inference.inferredTagLang, - await fetchCustomPrompts(bookmark.userId, "images"), - ), - metadata.contentType, - base64, - { schema: openAIResponseSchema, abortSignal }, - ); -} - -async function fetchCustomPrompts( - userId: string, - appliesTo: "text" | "images", -) { - const prompts = await db.query.customPrompts.findMany({ - where: and( - eq(customPrompts.userId, userId), - inArray(customPrompts.appliesTo, ["all_tagging", appliesTo]), - ), - columns: { - text: true, - }, - }); - - let promptTexts = prompts.map((p) => p.text); - if (containsTagsPlaceholder(prompts)) { - promptTexts = await replaceTagsPlaceholders(promptTexts, userId); - } - - return promptTexts; -} - -async function replaceTagsPlaceholders( - prompts: string[], - userId: string, -): Promise { - const api = await buildImpersonatingTRPCClient(userId); - const tags = (await api.tags.list()).tags; - const tagsString = `[${tags.map((tag) => tag.name).join(", ")}]`; - const aiTagsString = `[${tags - .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 == 0) - .map((tag) => tag.name) - .join(", ")}]`; - const userTagsString = `[${tags - .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 > 0) - .map((tag) => tag.name) - .join(", ")}]`; - - return prompts.map((p) => - p - .replaceAll("$tags", tagsString) - .replaceAll("$aiTags", aiTagsString) - .replaceAll("$userTags", userTagsString), - ); -} - -function containsTagsPlaceholder(prompts: { text: string }[]): boolean { - return ( - prompts.filter( - (p) => - p.text.includes("$tags") || - p.text.includes("$aiTags") || - p.text.includes("$userTags"), - ).length > 0 - ); -} - -async function inferTagsFromPDF( - _jobId: string, - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - const prompt = buildTextPrompt( - serverConfig.inference.inferredTagLang, - await fetchCustomPrompts(bookmark.userId, "text"), - `Content: ${bookmark.asset.content}`, - serverConfig.inference.contextLength, - ); - return inferenceClient.inferFromText(prompt, { - schema: openAIResponseSchema, - abortSignal, - }); -} - -async function inferTagsFromText( - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - return await inferenceClient.inferFromText(await buildPrompt(bookmark), { - schema: openAIResponseSchema, - abortSignal, - }); -} - -async function inferTags( - jobId: string, - bookmark: NonNullable>>, - inferenceClient: InferenceClient, - abortSignal: AbortSignal, -) { - let response; - if (bookmark.link || bookmark.text) { - response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); - } else if (bookmark.asset) { - switch (bookmark.asset.assetType) { - case "image": - response = await inferTagsFromImage( - jobId, - bookmark, - inferenceClient, - abortSignal, - ); - break; - case "pdf": - response = await inferTagsFromPDF( - jobId, - bookmark, - inferenceClient, - abortSignal, - ); - break; - default: - throw new Error(`[inference][${jobId}] Unsupported bookmark type`); - } - } else { - throw new Error(`[inference][${jobId}] Unsupported bookmark type`); - } - - if (!response) { - throw new Error(`[inference][${jobId}] Inference response is empty`); - } - - try { - let tags = openAIResponseSchema.parse(JSON.parse(response.response)).tags; - logger.info( - `[inference][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${response.totalTokens} tokens and inferred: ${tags}`, - ); - - // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. - // Additionally, trim the tags to prevent whitespaces at the beginning/the end of the tag. - tags = tags.map((t) => { - let tag = t; - if (tag.startsWith("#")) { - tag = t.slice(1); - } - return tag.trim(); - }); - - return tags; - } catch (e) { - const responseSneak = response.response.substring(0, 20); - throw new Error( - `[inference][${jobId}] The model ignored our prompt and didn't respond with the expected JSON: ${JSON.stringify(e)}. Here's a sneak peak from the response: ${responseSneak}`, - ); - } -} - -async function connectTags( - bookmarkId: string, - inferredTags: string[], - userId: string, -) { - if (inferredTags.length == 0) { - return; - } - - await db.transaction(async (tx) => { - // Attempt to match exiting tags with the new ones - const { matchedTagIds, notFoundTagNames } = await (async () => { - const { normalizeTag, sql: normalizedTagSql } = tagNormalizer( - bookmarkTags.name, - ); - const normalizedInferredTags = inferredTags.map((t) => ({ - originalTag: t, - normalizedTag: normalizeTag(t), - })); - - const matchedTags = await tx.query.bookmarkTags.findMany({ - where: and( - eq(bookmarkTags.userId, userId), - inArray( - normalizedTagSql, - normalizedInferredTags.map((t) => t.normalizedTag), - ), - ), - }); - - const matchedTagIds = matchedTags.map((r) => r.id); - const notFoundTagNames = normalizedInferredTags - .filter( - (t) => - !matchedTags.some( - (mt) => normalizeTag(mt.name) === t.normalizedTag, - ), - ) - .map((t) => t.originalTag); - - return { matchedTagIds, notFoundTagNames }; - })(); - - // Create tags that didn't exist previously - let newTagIds: string[] = []; - if (notFoundTagNames.length > 0) { - newTagIds = ( - await tx - .insert(bookmarkTags) - .values( - notFoundTagNames.map((t) => ({ - name: t, - userId, - })), - ) - .onConflictDoNothing() - .returning() - ).map((t) => t.id); - } - - // Delete old AI tags - const detachedTags = await tx - .delete(tagsOnBookmarks) - .where( - and( - eq(tagsOnBookmarks.attachedBy, "ai"), - eq(tagsOnBookmarks.bookmarkId, bookmarkId), - ), - ) - .returning(); - - const allTagIds = new Set([...matchedTagIds, ...newTagIds]); - - // Attach new ones - const attachedTags = await tx - .insert(tagsOnBookmarks) - .values( - [...allTagIds].map((tagId) => ({ - tagId, - bookmarkId, - attachedBy: "ai" as const, - })), - ) - .onConflictDoNothing() - .returning(); - - await triggerRuleEngineOnEvent(bookmarkId, [ - ...detachedTags.map((t) => ({ - type: "tagRemoved" as const, - tagId: t.tagId, - })), - ...attachedTags.map((t) => ({ - type: "tagAdded" as const, - tagId: t.tagId, - })), - ]); - }); -} - -async function runOpenAI(job: DequeuedJob) { - const jobId = job.id; - - const inferenceClient = InferenceClientFactory.build(); - if (!inferenceClient) { - logger.debug( - `[inference][${jobId}] No inference client configured, nothing to do now`, - ); - return; - } - - const request = zOpenAIRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const { bookmarkId } = request.data; - const bookmark = await fetchBookmark(bookmarkId); - if (!bookmark) { - throw new Error( - `[inference][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - - logger.info( - `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, - ); - - const tags = await inferTags( - jobId, - bookmark, - inferenceClient, - job.abortSignal, - ); - - await connectTags(bookmarkId, tags, bookmark.userId); - - // Trigger a webhook - await triggerWebhook(bookmarkId, "ai tagged"); - - // Update the search index - await triggerSearchReindex(bookmarkId); -} diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/ruleEngineWorker.ts deleted file mode 100644 index 427cc383..00000000 --- a/apps/workers/ruleEngineWorker.ts +++ /dev/null @@ -1,86 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import { buildImpersonatingAuthedContext } from "trpc"; - -import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { - RuleEngineQueue, - zRuleEngineRequestSchema, -} from "@karakeep/shared/queues"; -import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; - -export class RuleEngineWorker { - static build() { - logger.info("Starting rule engine worker ..."); - const worker = new Runner( - RuleEngineQueue, - { - run: runRuleEngine, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[ruleEngine][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 10, - validator: zRuleEngineRequestSchema, - }, - ); - - return worker; - } -} - -async function getBookmarkUserId(bookmarkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - columns: { - userId: true, - }, - }); -} - -async function runRuleEngine(job: DequeuedJob) { - const jobId = job.id; - const { bookmarkId, events } = job.data; - - const bookmark = await getBookmarkUserId(bookmarkId); - if (!bookmark) { - throw new Error( - `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - const userId = bookmark.userId; - const authedCtx = await buildImpersonatingAuthedContext(userId); - - const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); - - const results = ( - await Promise.all(events.map((event) => ruleEngine.onEvent(event))) - ).flat(); - - if (results.length == 0) { - return; - } - - const message = results - .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`) - .join("\n"); - - logger.info( - `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`, - ); -} diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts deleted file mode 100644 index e7b827a9..00000000 --- a/apps/workers/searchWorker.ts +++ /dev/null @@ -1,156 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; - -import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import logger from "@karakeep/shared/logger"; -import { - SearchIndexingQueue, - zSearchIndexingRequestSchema, -} from "@karakeep/shared/queues"; -import { getSearchIdxClient } from "@karakeep/shared/search"; - -export class SearchIndexingWorker { - static build() { - logger.info("Starting search indexing worker ..."); - const worker = new Runner( - SearchIndexingQueue, - { - run: runSearchIndexing, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[search][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function ensureTaskSuccess( - searchClient: NonNullable>>, - taskUid: number, -) { - const task = await searchClient.waitForTask(taskUid); - if (task.error) { - throw new Error(`Search task failed: ${task.error.message}`); - } -} - -async function runIndex( - searchClient: NonNullable>>, - bookmarkId: string, -) { - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: true, - text: true, - asset: true, - tagsOnBookmarks: { - with: { - tag: true, - }, - }, - }, - }); - - if (!bookmark) { - throw new Error(`Bookmark ${bookmarkId} not found`); - } - - const task = await searchClient.addDocuments( - [ - { - id: bookmark.id, - userId: bookmark.userId, - ...(bookmark.link - ? { - url: bookmark.link.url, - linkTitle: bookmark.link.title, - description: bookmark.link.description, - content: bookmark.link.content, - publisher: bookmark.link.publisher, - author: bookmark.link.author, - datePublished: bookmark.link.datePublished, - dateModified: bookmark.link.dateModified, - } - : undefined), - ...(bookmark.asset - ? { - content: bookmark.asset.content, - metadata: bookmark.asset.metadata, - } - : undefined), - ...(bookmark.text ? { content: bookmark.text.text } : undefined), - note: bookmark.note, - summary: bookmark.summary, - title: bookmark.title, - createdAt: bookmark.createdAt.toISOString(), - tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), - }, - ], - { - primaryKey: "id", - }, - ); - await ensureTaskSuccess(searchClient, task.taskUid); -} - -async function runDelete( - searchClient: NonNullable>>, - bookmarkId: string, -) { - const task = await searchClient.deleteDocument(bookmarkId); - await ensureTaskSuccess(searchClient, task.taskUid); -} - -async function runSearchIndexing(job: DequeuedJob) { - const jobId = job.id; - - const request = zSearchIndexingRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const searchClient = await getSearchIdxClient(); - if (!searchClient) { - logger.debug( - `[search][${jobId}] Search is not configured, nothing to do now`, - ); - return; - } - - const bookmarkId = request.data.bookmarkId; - logger.info( - `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`, - ); - - switch (request.data.type) { - case "index": { - await runIndex(searchClient, bookmarkId); - break; - } - case "delete": { - await runDelete(searchClient, bookmarkId); - break; - } - } -} diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/tidyAssetsWorker.ts deleted file mode 100644 index d4c8abdb..00000000 --- a/apps/workers/tidyAssetsWorker.ts +++ /dev/null @@ -1,107 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; - -import { db } from "@karakeep/db"; -import { assets } from "@karakeep/db/schema"; -import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; -import logger from "@karakeep/shared/logger"; -import { - TidyAssetsQueue, - ZTidyAssetsRequest, - zTidyAssetsRequestSchema, -} from "@karakeep/shared/queues"; - -export class TidyAssetsWorker { - static build() { - logger.info("Starting tidy assets worker ..."); - const worker = new Runner( - TidyAssetsQueue, - { - run: runTidyAssets, - onComplete: (job) => { - const jobId = job.id; - logger.info(`[tidyAssets][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: (job) => { - const jobId = job.id; - logger.error( - `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function handleAsset( - asset: { - assetId: string; - userId: string; - size: number; - contentType: string; - fileName?: string | null; - }, - request: ZTidyAssetsRequest, - jobId: string, -) { - const dbRow = await db.query.assets.findFirst({ - where: eq(assets.id, asset.assetId), - }); - if (!dbRow) { - if (request.cleanDanglingAssets) { - await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); - logger.info( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, - ); - } else { - logger.warn( - `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, - ); - } - return; - } - - if (request.syncAssetMetadata) { - await db - .update(assets) - .set({ - contentType: asset.contentType, - fileName: asset.fileName, - size: asset.size, - }) - .where(eq(assets.id, asset.assetId)); - logger.info( - `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, - ); - } -} - -async function runTidyAssets(job: DequeuedJob) { - const jobId = job.id; - - const request = zTidyAssetsRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - for await (const asset of getAllAssets()) { - try { - handleAsset(asset, request.data, jobId); - } catch (e) { - logger.error( - `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, - ); - } - } -} diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts deleted file mode 100644 index b8f85ddf..00000000 --- a/apps/workers/videoWorker.ts +++ /dev/null @@ -1,214 +0,0 @@ -import fs from "fs"; -import * as os from "os"; -import path from "path"; -import { execa } from "execa"; -import { DequeuedJob, Runner } from "liteque"; - -import { db } from "@karakeep/db"; -import { AssetTypes } from "@karakeep/db/schema"; -import { - ASSET_TYPES, - getAssetSize, - newAssetId, - saveAssetFromFile, - silentDeleteAsset, -} from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - VideoWorkerQueue, - ZVideoRequest, - zvideoRequestSchema, -} from "@karakeep/shared/queues"; - -import { withTimeout } from "./utils"; -import { getBookmarkDetails, updateAsset } from "./workerUtils"; - -const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads"); - -export class VideoWorker { - static build() { - logger.info("Starting video worker ..."); - - return new Runner( - VideoWorkerQueue, - { - run: withTimeout( - runWorker, - /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout, - ), - onComplete: async (job) => { - const jobId = job.id; - logger.info( - `[VideoCrawler][${jobId}] Video Download Completed successfully`, - ); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`, - ); - return Promise.resolve(); - }, - }, - { - pollIntervalMs: 1000, - timeoutSecs: serverConfig.crawler.downloadVideoTimeout, - concurrency: 1, - validator: zvideoRequestSchema, - }, - ); - } -} - -function prepareYtDlpArguments(url: string, assetPath: string) { - const ytDlpArguments = [url]; - if (serverConfig.crawler.maxVideoDownloadSize > 0) { - ytDlpArguments.push( - "-f", - `best[filesize<${serverConfig.crawler.maxVideoDownloadSize}M]`, - ); - } - - ytDlpArguments.push(...serverConfig.crawler.ytDlpArguments); - ytDlpArguments.push("-o", assetPath); - ytDlpArguments.push("--no-playlist"); - return ytDlpArguments; -} - -async function runWorker(job: DequeuedJob) { - const jobId = job.id; - const { bookmarkId } = job.data; - - const { - url, - userId, - videoAssetId: oldVideoAssetId, - } = await getBookmarkDetails(bookmarkId); - - if (!serverConfig.crawler.downloadVideo) { - logger.info( - `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it is disabled in the config.`, - ); - return; - } - - const videoAssetId = newAssetId(); - let assetPath = `${TMP_FOLDER}/${videoAssetId}`; - await fs.promises.mkdir(TMP_FOLDER, { recursive: true }); - - const ytDlpArguments = prepareYtDlpArguments(url, assetPath); - - try { - logger.info( - `[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`, - ); - - await execa("yt-dlp", ytDlpArguments, { - cancelSignal: job.abortSignal, - }); - const downloadPath = await findAssetFile(videoAssetId); - if (!downloadPath) { - logger.info( - "[VideoCrawler][${jobId}] yt-dlp didn't download anything. Skipping ...", - ); - return; - } - assetPath = downloadPath; - } catch (e) { - const err = e as Error; - if ( - err.message.includes("ERROR: Unsupported URL:") || - err.message.includes("No media found") - ) { - logger.info( - `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it's not one of the supported yt-dlp URLs`, - ); - return; - } - logger.error( - `[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`, - ); - await deleteLeftOverAssetFile(jobId, videoAssetId); - return; - } - - logger.info( - `[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`, - ); - await saveAssetFromFile({ - userId, - assetId: videoAssetId, - assetPath, - metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, - }); - - await db.transaction(async (txn) => { - await updateAsset( - oldVideoAssetId, - { - id: videoAssetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_VIDEO, - contentType: ASSET_TYPES.VIDEO_MP4, - size: await getAssetSize({ userId, assetId: videoAssetId }), - }, - txn, - ); - }); - await silentDeleteAsset(userId, oldVideoAssetId); - - logger.info( - `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, - ); -} - -/** - * Deletes leftover assets in case the download fails - * - * @param jobId the id of the job - * @param assetId the id of the asset to delete - */ -async function deleteLeftOverAssetFile( - jobId: string, - assetId: string, -): Promise { - let assetFile; - try { - assetFile = await findAssetFile(assetId); - } catch { - // ignore exception, no asset file was found - return; - } - if (!assetFile) { - return; - } - logger.info( - `[VideoCrawler][${jobId}] Deleting leftover video asset "${assetFile}".`, - ); - try { - await fs.promises.rm(assetFile); - } catch (e) { - logger.error( - `[VideoCrawler][${jobId}] Failed deleting leftover video asset "${assetFile}".`, - ); - } -} - -/** - * yt-dlp automatically adds a file ending to the passed in filename --> we have to search it again in the folder - * - * @param assetId the id of the asset to search - * @returns the path to the downloaded asset - */ -async function findAssetFile(assetId: string): Promise { - const files = await fs.promises.readdir(TMP_FOLDER); - for (const file of files) { - if (file.startsWith(assetId)) { - return path.join(TMP_FOLDER, file); - } - } - return null; -} diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts deleted file mode 100644 index 9d3ed2c1..00000000 --- a/apps/workers/webhookWorker.ts +++ /dev/null @@ -1,146 +0,0 @@ -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import fetch from "node-fetch"; - -import { db } from "@karakeep/db"; -import { bookmarks } from "@karakeep/db/schema"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - WebhookQueue, - ZWebhookRequest, - zWebhookRequestSchema, -} from "@karakeep/shared/queues"; - -export class WebhookWorker { - static build() { - logger.info("Starting webhook worker ..."); - const worker = new Runner( - WebhookQueue, - { - run: runWebhook, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[webhook][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: - serverConfig.webhook.timeoutSec * - (serverConfig.webhook.retryTimes + 1) + - 1, //consider retry times, and timeout and add 1 second for other stuff - validator: zWebhookRequestSchema, - }, - ); - - return worker; - } -} - -async function fetchBookmark(bookmarkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - link: { - columns: { - url: true, - }, - }, - user: { - columns: {}, - with: { - webhooks: true, - }, - }, - }, - }); -} - -async function runWebhook(job: DequeuedJob) { - const jobId = job.id; - const webhookTimeoutSec = serverConfig.webhook.timeoutSec; - - const { bookmarkId } = job.data; - const bookmark = await fetchBookmark(bookmarkId); - if (!bookmark) { - throw new Error( - `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, - ); - } - - if (!bookmark.user.webhooks) { - return; - } - - logger.info( - `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id} for operation "${job.data.operation}"`, - ); - - await Promise.allSettled( - bookmark.user.webhooks - .filter((w) => w.events.includes(job.data.operation)) - .map(async (webhook) => { - const url = webhook.url; - const webhookToken = webhook.token; - const maxRetries = serverConfig.webhook.retryTimes; - let attempt = 0; - let success = false; - - while (attempt < maxRetries && !success) { - try { - const response = await fetch(url, { - method: "POST", - headers: { - "Content-Type": "application/json", - ...(webhookToken - ? { - Authorization: `Bearer ${webhookToken}`, - } - : {}), - }, - body: JSON.stringify({ - jobId, - bookmarkId, - userId: bookmark.userId, - url: bookmark.link ? bookmark.link.url : undefined, - type: bookmark.type, - operation: job.data.operation, - }), - signal: AbortSignal.timeout(webhookTimeoutSec * 1000), - }); - - if (!response.ok) { - logger.error( - `Webhook call to ${url} failed with status: ${response.status}`, - ); - } else { - logger.info( - `[webhook][${jobId}] Webhook to ${url} call succeeded`, - ); - success = true; - } - } catch (error) { - logger.error( - `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, - ); - } - attempt++; - if (!success && attempt < maxRetries) { - logger.info( - `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, - ); - } - } - }), - ); -} diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts new file mode 100644 index 00000000..0c0b7aec --- /dev/null +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -0,0 +1,339 @@ +import os from "os"; +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import PDFParser from "pdf2json"; +import { fromBuffer } from "pdf2pic"; +import { createWorker } from "tesseract.js"; + +import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { + assets, + AssetTypes, + bookmarkAssets, + bookmarks, +} from "@karakeep/db/schema"; +import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + AssetPreprocessingQueue, + OpenAIQueue, + triggerSearchReindex, +} from "@karakeep/shared/queues"; + +export class AssetPreprocessingWorker { + static build() { + logger.info("Starting asset preprocessing worker ..."); + const worker = new Runner( + AssetPreprocessingQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function readImageText(buffer: Buffer) { + if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { + return null; + } + const worker = await createWorker(serverConfig.ocr.langs, undefined, { + cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), + }); + try { + const ret = await worker.recognize(buffer); + if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { + return null; + } + return ret.data.text; + } finally { + await worker.terminate(); + } +} + +async function readPDFText(buffer: Buffer): Promise<{ + text: string; + metadata: Record; +}> { + return new Promise((resolve, reject) => { + const pdfParser = new PDFParser(null, true); + pdfParser.on("pdfParser_dataError", reject); + pdfParser.on("pdfParser_dataReady", (pdfData) => { + resolve({ + text: pdfParser.getRawTextContent(), + metadata: pdfData.Meta, + }); + }); + pdfParser.parseBuffer(buffer); + }); +} + +export async function extractAndSavePDFScreenshot( + jobId: string, + asset: Buffer, + bookmark: NonNullable>>, + isFixMode: boolean, +): Promise { + { + const alreadyHasScreenshot = + bookmark.assets.find( + (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, + ) !== undefined; + if (alreadyHasScreenshot && isFixMode) { + logger.info( + `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, + ); + return false; + } + } + logger.info( + `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, + ); + try { + /** + * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL + * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md + */ + const screenshot = await fromBuffer(asset, { + density: 100, + quality: 100, + format: "png", + preserveAspectRatio: true, + })(1, { responseType: "buffer" }); + + if (!screenshot.buffer) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, + ); + return false; + } + + // Store the screenshot + const assetId = newAssetId(); + const fileName = "screenshot.png"; + const contentType = "image/png"; + await saveAsset({ + userId: bookmark.userId, + assetId, + asset: screenshot.buffer, + metadata: { + contentType, + fileName, + }, + }); + + // Insert into database + await db.insert(assets).values({ + id: assetId, + bookmarkId: bookmark.id, + userId: bookmark.userId, + assetType: AssetTypes.ASSET_SCREENSHOT, + contentType, + size: screenshot.buffer.byteLength, + fileName, + }); + + logger.info( + `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, + ); + return true; + } catch (error) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, + ); + return false; + } +} + +async function extractAndSaveImageText( + jobId: string, + asset: Buffer, + bookmark: NonNullable>>, + isFixMode: boolean, +): Promise { + { + const alreadyHasText = !!bookmark.asset.content; + if (alreadyHasText && isFixMode) { + logger.info( + `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, + ); + return false; + } + } + let imageText = null; + logger.info( + `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, + ); + try { + imageText = await readImageText(asset); + } catch (e) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, + ); + } + if (!imageText) { + return false; + } + + logger.info( + `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, + ); + await db + .update(bookmarkAssets) + .set({ + content: imageText, + metadata: null, + }) + .where(eq(bookmarkAssets.id, bookmark.id)); + return true; +} + +async function extractAndSavePDFText( + jobId: string, + asset: Buffer, + bookmark: NonNullable>>, + isFixMode: boolean, +): Promise { + { + const alreadyHasText = !!bookmark.asset.content; + if (alreadyHasText && isFixMode) { + logger.info( + `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, + ); + return false; + } + } + logger.info( + `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, + ); + const pdfParse = await readPDFText(asset); + if (!pdfParse?.text) { + throw new Error( + `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, + ); + } + logger.info( + `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, + ); + await db + .update(bookmarkAssets) + .set({ + content: pdfParse.text, + metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, + }) + .where(eq(bookmarkAssets.id, bookmark.id)); + return true; +} + +async function getBookmark(bookmarkId: string) { + return db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + asset: true, + assets: true, + }, + }); +} + +async function run(req: DequeuedJob) { + const isFixMode = req.data.fixMode; + const jobId = req.id; + const bookmarkId = req.data.bookmarkId; + + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + asset: true, + assets: true, + }, + }); + + logger.info( + `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, + ); + + if (!bookmark) { + throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); + } + + if (!bookmark.asset) { + throw new Error( + `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, + ); + } + + const { asset } = await readAsset({ + userId: bookmark.userId, + assetId: bookmark.asset.assetId, + }); + + if (!asset) { + throw new Error( + `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, + ); + } + + let anythingChanged = false; + switch (bookmark.asset.assetType) { + case "image": { + const extractedText = await extractAndSaveImageText( + jobId, + asset, + bookmark, + isFixMode, + ); + anythingChanged ||= extractedText; + break; + } + case "pdf": { + const extractedText = await extractAndSavePDFText( + jobId, + asset, + bookmark, + isFixMode, + ); + const extractedScreenshot = await extractAndSavePDFScreenshot( + jobId, + asset, + bookmark, + isFixMode, + ); + anythingChanged ||= extractedText || extractedScreenshot; + break; + } + default: + throw new Error( + `[assetPreprocessing][${jobId}] Unsupported bookmark type`, + ); + } + + if (!isFixMode || anythingChanged) { + await OpenAIQueue.enqueue({ + bookmarkId, + type: "tag", + }); + + // Update the search index + await triggerSearchReindex(bookmarkId); + } +} diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts new file mode 100644 index 00000000..b928e145 --- /dev/null +++ b/apps/workers/workers/crawlerWorker.ts @@ -0,0 +1,882 @@ +import * as dns from "dns"; +import { promises as fs } from "fs"; +import * as path from "node:path"; +import * as os from "os"; +import type { Browser } from "puppeteer"; +import { PuppeteerBlocker } from "@ghostery/adblocker-puppeteer"; +import { Readability } from "@mozilla/readability"; +import { Mutex } from "async-mutex"; +import DOMPurify from "dompurify"; +import { eq } from "drizzle-orm"; +import { execa } from "execa"; +import { isShuttingDown } from "exit"; +import { JSDOM } from "jsdom"; +import { DequeuedJob, Runner } from "liteque"; +import metascraper from "metascraper"; +import metascraperAmazon from "metascraper-amazon"; +import metascraperAuthor from "metascraper-author"; +import metascraperDate from "metascraper-date"; +import metascraperDescription from "metascraper-description"; +import metascraperImage from "metascraper-image"; +import metascraperLogo from "metascraper-logo-favicon"; +import metascraperPublisher from "metascraper-publisher"; +import metascraperReadability from "metascraper-readability"; +import metascraperTitle from "metascraper-title"; +import metascraperTwitter from "metascraper-twitter"; +import metascraperUrl from "metascraper-url"; +import fetch from "node-fetch"; +import puppeteer from "puppeteer-extra"; +import StealthPlugin from "puppeteer-extra-plugin-stealth"; +import { withTimeout } from "utils"; +import { getBookmarkDetails, updateAsset } from "workerUtils"; + +import type { ZCrawlLinkRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { + assets, + AssetTypes, + bookmarkAssets, + bookmarkLinks, + bookmarks, +} from "@karakeep/db/schema"; +import { + ASSET_TYPES, + getAssetSize, + IMAGE_ASSET_TYPES, + newAssetId, + readAsset, + saveAsset, + saveAssetFromFile, + silentDeleteAsset, + SUPPORTED_UPLOAD_ASSET_TYPES, +} from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + AssetPreprocessingQueue, + LinkCrawlerQueue, + OpenAIQueue, + triggerSearchReindex, + triggerVideoWorker, + triggerWebhook, + zCrawlLinkRequestSchema, +} from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +const metascraperParser = metascraper([ + metascraperDate({ + dateModified: true, + datePublished: true, + }), + metascraperAmazon(), + metascraperReadability(), + metascraperAuthor(), + metascraperPublisher(), + metascraperTitle(), + metascraperDescription(), + metascraperTwitter(), + metascraperImage(), + metascraperLogo(), + metascraperUrl(), +]); + +let globalBrowser: Browser | undefined; +let globalBlocker: PuppeteerBlocker | undefined; +// Guards the interactions with the browser instance. +// This is needed given that most of the browser APIs are async. +const browserMutex = new Mutex(); + +async function startBrowserInstance() { + const defaultViewport = { + width: 1440, + height: 900, + }; + if (serverConfig.crawler.browserWebSocketUrl) { + logger.info( + `[Crawler] Connecting to existing browser websocket address: ${serverConfig.crawler.browserWebSocketUrl}`, + ); + return puppeteer.connect({ + browserWSEndpoint: serverConfig.crawler.browserWebSocketUrl, + defaultViewport, + }); + } else if (serverConfig.crawler.browserWebUrl) { + logger.info( + `[Crawler] Connecting to existing browser instance: ${serverConfig.crawler.browserWebUrl}`, + ); + const webUrl = new URL(serverConfig.crawler.browserWebUrl); + // We need to resolve the ip address as a workaround for https://github.com/puppeteer/puppeteer/issues/2242 + const { address: address } = await dns.promises.lookup(webUrl.hostname); + webUrl.hostname = address; + logger.info( + `[Crawler] Successfully resolved IP address, new address: ${webUrl.toString()}`, + ); + return puppeteer.connect({ + browserURL: webUrl.toString(), + defaultViewport, + }); + } else { + logger.info(`Running in browserless mode`); + return undefined; + } +} + +async function launchBrowser() { + globalBrowser = undefined; + await browserMutex.runExclusive(async () => { + try { + globalBrowser = await startBrowserInstance(); + } catch (e) { + logger.error( + `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${(e as Error).stack}`, + ); + if (isShuttingDown) { + logger.info("[Crawler] We're shutting down so won't retry."); + return; + } + setTimeout(() => { + launchBrowser(); + }, 5000); + return; + } + globalBrowser?.on("disconnected", () => { + if (isShuttingDown) { + logger.info( + "[Crawler] The puppeteer browser got disconnected. But we're shutting down so won't restart it.", + ); + return; + } + logger.info( + "[Crawler] The puppeteer browser got disconnected. Will attempt to launch it again.", + ); + launchBrowser(); + }); + }); +} + +export class CrawlerWorker { + static async build() { + puppeteer.use(StealthPlugin()); + if (serverConfig.crawler.enableAdblocker) { + try { + logger.info("[crawler] Loading adblocker ..."); + globalBlocker = await PuppeteerBlocker.fromPrebuiltFull(fetch, { + path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), + read: fs.readFile, + write: fs.writeFile, + }); + } catch (e) { + logger.error( + `[crawler] Failed to load adblocker. Will not be blocking ads: ${e}`, + ); + } + } + if (!serverConfig.crawler.browserConnectOnDemand) { + await launchBrowser(); + } else { + logger.info( + "[Crawler] Browser connect on demand is enabled, won't proactively start the browser instance", + ); + } + + logger.info("Starting crawler worker ..."); + const worker = new Runner( + LinkCrawlerQueue, + { + run: withTimeout( + runCrawler, + /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, + ), + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[Crawler][${jobId}] Completed successfully`); + const bookmarkId = job.data.bookmarkId; + if (bookmarkId) { + await changeBookmarkStatus(bookmarkId, "success"); + } + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`, + ); + const bookmarkId = job.data?.bookmarkId; + if (bookmarkId && job.numRetriesLeft == 0) { + await changeBookmarkStatus(bookmarkId, "failure"); + } + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.jobTimeoutSec, + concurrency: serverConfig.crawler.numWorkers, + }, + ); + + return worker; + } +} + +type DBAssetType = typeof assets.$inferInsert; + +async function changeBookmarkStatus( + bookmarkId: string, + crawlStatus: "success" | "failure", +) { + await db + .update(bookmarkLinks) + .set({ + crawlStatus, + }) + .where(eq(bookmarkLinks.id, bookmarkId)); +} + +/** + * This provides some "basic" protection from malicious URLs. However, all of those + * can be easily circumvented by pointing dns of origin to localhost, or with + * redirects. + */ +function validateUrl(url: string) { + const urlParsed = new URL(url); + if (urlParsed.protocol != "http:" && urlParsed.protocol != "https:") { + throw new Error(`Unsupported URL protocol: ${urlParsed.protocol}`); + } + + if (["localhost", "127.0.0.1", "0.0.0.0"].includes(urlParsed.hostname)) { + throw new Error(`Link hostname rejected: ${urlParsed.hostname}`); + } +} + +async function browserlessCrawlPage( + jobId: string, + url: string, + abortSignal: AbortSignal, +) { + logger.info( + `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, + ); + const response = await fetch(url, { + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + logger.info( + `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, + ); + return { + htmlContent: await response.text(), + statusCode: response.status, + screenshot: undefined, + url: response.url, + }; +} + +async function crawlPage( + jobId: string, + url: string, + abortSignal: AbortSignal, +): Promise<{ + htmlContent: string; + screenshot: Buffer | undefined; + statusCode: number; + url: string; +}> { + let browser: Browser | undefined; + if (serverConfig.crawler.browserConnectOnDemand) { + browser = await startBrowserInstance(); + } else { + browser = globalBrowser; + } + if (!browser) { + return browserlessCrawlPage(jobId, url, abortSignal); + } + const context = await browser.createBrowserContext(); + + try { + const page = await context.newPage(); + if (globalBlocker) { + await globalBlocker.enableBlockingInPage(page); + } + await page.setUserAgent( + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + ); + + const response = await page.goto(url, { + timeout: serverConfig.crawler.navigateTimeoutSec * 1000, + }); + logger.info( + `[Crawler][${jobId}] Successfully navigated to "${url}". Waiting for the page to load ...`, + ); + + // Wait until there's at most two connections for 2 seconds + // Attempt to wait only for 5 seconds + await Promise.race([ + page.waitForNetworkIdle({ + idleTime: 1000, // 1 sec + concurrency: 2, + }), + new Promise((f) => setTimeout(f, 5000)), + ]); + + logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); + + const htmlContent = await page.content(); + logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); + + let screenshot: Buffer | undefined = undefined; + if (serverConfig.crawler.storeScreenshot) { + try { + screenshot = await Promise.race([ + page.screenshot({ + // If you change this, you need to change the asset type in the store function. + type: "png", + encoding: "binary", + fullPage: serverConfig.crawler.fullPageScreenshot, + }), + new Promise((_, reject) => + setTimeout( + () => + reject( + "TIMED_OUT, consider increasing CRAWLER_SCREENSHOT_TIMEOUT_SEC", + ), + serverConfig.crawler.screenshotTimeoutSec * 1000, + ), + ), + ]); + logger.info( + `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, + ); + } catch (e) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${e}`, + ); + } + } + + return { + htmlContent, + statusCode: response?.status() ?? 0, + screenshot, + url: page.url(), + }; + } finally { + await context.close(); + if (serverConfig.crawler.browserConnectOnDemand) { + await browser.close(); + } + } +} + +async function extractMetadata( + htmlContent: string, + url: string, + jobId: string, +) { + logger.info( + `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, + ); + const meta = await metascraperParser({ + url, + html: htmlContent, + // We don't want to validate the URL again as we've already done it by visiting the page. + // This was added because URL validation fails if the URL ends with a question mark (e.g. empty query params). + validateUrl: false, + }); + logger.info(`[Crawler][${jobId}] Done extracting metadata from the page.`); + return meta; +} + +function extractReadableContent( + htmlContent: string, + url: string, + jobId: string, +) { + logger.info( + `[Crawler][${jobId}] Will attempt to extract readable content ...`, + ); + const dom = new JSDOM(htmlContent, { url }); + const readableContent = new Readability(dom.window.document).parse(); + if (!readableContent || typeof readableContent.content !== "string") { + return null; + } + + const window = new JSDOM("").window; + const purify = DOMPurify(window); + const purifiedHTML = purify.sanitize(readableContent.content); + + logger.info(`[Crawler][${jobId}] Done extracting readable content.`); + return { + content: purifiedHTML, + textContent: readableContent.textContent, + }; +} + +async function storeScreenshot( + screenshot: Buffer | undefined, + userId: string, + jobId: string, +) { + if (!serverConfig.crawler.storeScreenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as per the config.`, + ); + return null; + } + if (!screenshot) { + logger.info( + `[Crawler][${jobId}] Skipping storing the screenshot as it's empty.`, + ); + return null; + } + const assetId = newAssetId(); + const contentType = "image/png"; + const fileName = "screenshot.png"; + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: screenshot, + }); + logger.info( + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, + ); + return { assetId, contentType, fileName, size: screenshot.byteLength }; +} + +async function downloadAndStoreFile( + url: string, + userId: string, + jobId: string, + fileType: string, + abortSignal: AbortSignal, +) { + try { + logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`); + const response = await fetch(url, { + signal: abortSignal, + }); + if (!response.ok) { + throw new Error(`Failed to download ${fileType}: ${response.status}`); + } + const buffer = await response.arrayBuffer(); + const assetId = newAssetId(); + + const contentType = response.headers.get("content-type"); + if (!contentType) { + throw new Error("No content type in the response"); + } + + await saveAsset({ + userId, + assetId, + metadata: { contentType }, + asset: Buffer.from(buffer), + }); + + logger.info( + `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId}`, + ); + + return { assetId, userId, contentType, size: buffer.byteLength }; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, + ); + return null; + } +} + +async function downloadAndStoreImage( + url: string, + userId: string, + jobId: string, + abortSignal: AbortSignal, +) { + if (!serverConfig.crawler.downloadBannerImage) { + logger.info( + `[Crawler][${jobId}] Skipping downloading the image as per the config.`, + ); + return null; + } + return downloadAndStoreFile(url, userId, jobId, "image", abortSignal); +} + +async function archiveWebpage( + html: string, + url: string, + userId: string, + jobId: string, + abortSignal: AbortSignal, +) { + logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); + const assetId = newAssetId(); + const assetPath = `/tmp/${assetId}`; + + await execa({ + input: html, + cancelSignal: abortSignal, + })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); + + const contentType = "text/html"; + + await saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata: { + contentType, + }, + }); + + logger.info( + `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + ); + + return { + assetId, + contentType, + size: await getAssetSize({ userId, assetId }), + }; +} + +async function getContentType( + url: string, + jobId: string, + abortSignal: AbortSignal, +): Promise { + try { + logger.info( + `[Crawler][${jobId}] Attempting to determine the content-type for the url ${url}`, + ); + const response = await fetch(url, { + method: "HEAD", + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), + }); + const contentType = response.headers.get("content-type"); + logger.info( + `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, + ); + return contentType; + } catch (e) { + logger.error( + `[Crawler][${jobId}] Failed to determine the content-type for the url ${url}: ${e}`, + ); + return null; + } +} + +/** + * Downloads the asset from the URL and transforms the linkBookmark to an assetBookmark + * @param url the url the user provided + * @param assetType the type of the asset we're downloading + * @param userId the id of the user + * @param jobId the id of the job for logging + * @param bookmarkId the id of the bookmark + */ +async function handleAsAssetBookmark( + url: string, + assetType: "image" | "pdf", + userId: string, + jobId: string, + bookmarkId: string, + abortSignal: AbortSignal, +) { + const downloaded = await downloadAndStoreFile( + url, + userId, + jobId, + assetType, + abortSignal, + ); + if (!downloaded) { + return; + } + const fileName = path.basename(new URL(url).pathname); + await db.transaction(async (trx) => { + await updateAsset( + undefined, + { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.BOOKMARK_ASSET, + contentType: downloaded.contentType, + size: downloaded.size, + fileName, + }, + trx, + ); + await trx.insert(bookmarkAssets).values({ + id: bookmarkId, + assetType, + assetId: downloaded.assetId, + content: null, + fileName, + sourceUrl: url, + }); + // Switch the type of the bookmark from LINK to ASSET + await trx + .update(bookmarks) + .set({ type: BookmarkTypes.ASSET }) + .where(eq(bookmarks.id, bookmarkId)); + await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); + }); + await AssetPreprocessingQueue.enqueue({ + bookmarkId, + fixMode: false, + }); +} + +async function crawlAndParseUrl( + url: string, + userId: string, + jobId: string, + bookmarkId: string, + oldScreenshotAssetId: string | undefined, + oldImageAssetId: string | undefined, + oldFullPageArchiveAssetId: string | undefined, + precrawledArchiveAssetId: string | undefined, + archiveFullPage: boolean, + abortSignal: AbortSignal, +) { + let result: { + htmlContent: string; + screenshot: Buffer | undefined; + statusCode: number | null; + url: string; + }; + + if (precrawledArchiveAssetId) { + logger.info( + `[Crawler][${jobId}] The page has been precrawled. Will use the precrawled archive instead.`, + ); + const asset = await readAsset({ + userId, + assetId: precrawledArchiveAssetId, + }); + result = { + htmlContent: asset.asset.toString(), + screenshot: undefined, + statusCode: 200, + url, + }; + } else { + result = await crawlPage(jobId, url, abortSignal); + } + abortSignal.throwIfAborted(); + + const { htmlContent, screenshot, statusCode, url: browserUrl } = result; + + const [meta, readableContent, screenshotAssetInfo] = await Promise.all([ + extractMetadata(htmlContent, browserUrl, jobId), + extractReadableContent(htmlContent, browserUrl, jobId), + storeScreenshot(screenshot, userId, jobId), + ]); + abortSignal.throwIfAborted(); + let imageAssetInfo: DBAssetType | null = null; + if (meta.image) { + const downloaded = await downloadAndStoreImage( + meta.image, + userId, + jobId, + abortSignal, + ); + if (downloaded) { + imageAssetInfo = { + id: downloaded.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_BANNER_IMAGE, + contentType: downloaded.contentType, + size: downloaded.size, + }; + } + } + abortSignal.throwIfAborted(); + + const parseDate = (date: string | undefined) => { + if (!date) { + return null; + } + try { + return new Date(date); + } catch (_e) { + return null; + } + }; + + // TODO(important): Restrict the size of content to store + await db.transaction(async (txn) => { + await txn + .update(bookmarkLinks) + .set({ + title: meta.title, + description: meta.description, + // Don't store data URIs as they're not valid URLs and are usually quite large + imageUrl: meta.image?.startsWith("data:") ? null : meta.image, + favicon: meta.logo, + content: readableContent?.textContent, + htmlContent: readableContent?.content, + crawledAt: new Date(), + crawlStatusCode: statusCode, + author: meta.author, + publisher: meta.publisher, + datePublished: parseDate(meta.datePublished), + dateModified: parseDate(meta.dateModified), + }) + .where(eq(bookmarkLinks.id, bookmarkId)); + + if (screenshotAssetInfo) { + await updateAsset( + oldScreenshotAssetId, + { + id: screenshotAssetInfo.assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_SCREENSHOT, + contentType: screenshotAssetInfo.contentType, + size: screenshotAssetInfo.size, + fileName: screenshotAssetInfo.fileName, + }, + txn, + ); + } + if (imageAssetInfo) { + await updateAsset(oldImageAssetId, imageAssetInfo, txn); + } + }); + + // Delete the old assets if any + await Promise.all([ + silentDeleteAsset(userId, oldScreenshotAssetId), + silentDeleteAsset(userId, oldImageAssetId), + ]); + + return async () => { + if ( + !precrawledArchiveAssetId && + (serverConfig.crawler.fullPageArchive || archiveFullPage) + ) { + const { + assetId: fullPageArchiveAssetId, + size, + contentType, + } = await archiveWebpage( + htmlContent, + browserUrl, + userId, + jobId, + abortSignal, + ); + + await db.transaction(async (txn) => { + await updateAsset( + oldFullPageArchiveAssetId, + { + id: fullPageArchiveAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, + contentType, + size, + fileName: null, + }, + txn, + ); + }); + if (oldFullPageArchiveAssetId) { + silentDeleteAsset(userId, oldFullPageArchiveAssetId); + } + } + }; +} + +async function runCrawler(job: DequeuedJob) { + const jobId = job.id ?? "unknown"; + + const request = zCrawlLinkRequestSchema.safeParse(job.data); + if (!request.success) { + logger.error( + `[Crawler][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + return; + } + + const { bookmarkId, archiveFullPage } = request.data; + const { + url, + userId, + screenshotAssetId: oldScreenshotAssetId, + imageAssetId: oldImageAssetId, + fullPageArchiveAssetId: oldFullPageArchiveAssetId, + precrawledArchiveAssetId, + } = await getBookmarkDetails(bookmarkId); + + logger.info( + `[Crawler][${jobId}] Will crawl "${url}" for link with id "${bookmarkId}"`, + ); + validateUrl(url); + + const contentType = await getContentType(url, jobId, job.abortSignal); + + // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage + const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; + + if (isPdf) { + await handleAsAssetBookmark( + url, + "pdf", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); + } else if ( + contentType && + IMAGE_ASSET_TYPES.has(contentType) && + SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType) + ) { + await handleAsAssetBookmark( + url, + "image", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); + } else { + const archivalLogic = await crawlAndParseUrl( + url, + userId, + jobId, + bookmarkId, + oldScreenshotAssetId, + oldImageAssetId, + oldFullPageArchiveAssetId, + precrawledArchiveAssetId, + archiveFullPage, + job.abortSignal, + ); + + // Enqueue openai job (if not set, assume it's true for backward compatibility) + if (job.data.runInference !== false) { + await OpenAIQueue.enqueue({ + bookmarkId, + type: "tag", + }); + await OpenAIQueue.enqueue({ + bookmarkId, + type: "summarize", + }); + } + + // Update the search index + await triggerSearchReindex(bookmarkId); + + // Trigger a potential download of a video from the URL + await triggerVideoWorker(bookmarkId, url); + + // Trigger a webhook + await triggerWebhook(bookmarkId, "crawled"); + + // Do the archival as a separate last step as it has the potential for failure + await archivalLogic(); + } +} diff --git a/apps/workers/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts new file mode 100644 index 00000000..1eaba0c3 --- /dev/null +++ b/apps/workers/workers/feedWorker.ts @@ -0,0 +1,215 @@ +import { and, eq, inArray } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import cron from "node-cron"; +import Parser from "rss-parser"; +import { buildImpersonatingTRPCClient } from "trpc"; +import { z } from "zod"; + +import type { ZFeedRequestSchema } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { rssFeedImportsTable, rssFeedsTable } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { FeedQueue } from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +export const FeedRefreshingWorker = cron.schedule( + "0 * * * *", + () => { + logger.info("[feed] Scheduling feed refreshing jobs ..."); + db.query.rssFeedsTable + .findMany({ + columns: { + id: true, + }, + where: eq(rssFeedsTable.enabled, true), + }) + .then((feeds) => { + for (const feed of feeds) { + FeedQueue.enqueue( + { + feedId: feed.id, + }, + { + idempotencyKey: feed.id, + }, + ); + } + }); + }, + { + runOnInit: false, + scheduled: false, + }, +); + +export class FeedWorker { + static build() { + logger.info("Starting feed worker ..."); + const worker = new Runner( + FeedQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[feed][${jobId}] Completed successfully`); + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "success", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[feed][${jobId}] Feed fetch job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.data) { + await db + .update(rssFeedsTable) + .set({ lastFetchedStatus: "failure", lastFetchedAt: new Date() }) + .where(eq(rssFeedsTable.id, job.data?.feedId)); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function run(req: DequeuedJob) { + const jobId = req.id; + const feed = await db.query.rssFeedsTable.findFirst({ + where: eq(rssFeedsTable.id, req.data.feedId), + }); + if (!feed) { + throw new Error( + `[feed][${jobId}] Feed with id ${req.data.feedId} not found`, + ); + } + logger.info( + `[feed][${jobId}] Starting fetching feed "${feed.name}" (${feed.id}) ...`, + ); + + const response = await fetch(feed.url, { + signal: AbortSignal.timeout(5000), + headers: { + UserAgent: + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", + Accept: "application/rss+xml", + }, + }); + if (response.status !== 200) { + throw new Error( + `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) returned a non-success status: ${response.status}.`, + ); + } + const contentType = response.headers.get("content-type"); + if (!contentType || !contentType.includes("xml")) { + throw new Error( + `[feed][${jobId}] Feed "${feed.name}" (${feed.id}) is not a valid RSS feed`, + ); + } + const xmlData = await response.text(); + + logger.info( + `[feed][${jobId}] Successfully fetched feed "${feed.name}" (${feed.id}) ...`, + ); + + const parser = new Parser({ + customFields: { + item: ["id"], + }, + }); + const unparseFeedData = await parser.parseString(xmlData); + + // Apparently, we can't trust the output of the xml parser. So let's do our own type + // validation. + const feedItemsSchema = z.object({ + id: z.coerce.string(), + link: z.string().optional(), + guid: z.string().optional(), + }); + + const feedItems = unparseFeedData.items + .map((i) => feedItemsSchema.safeParse(i)) + .flatMap((i) => (i.success ? [i.data] : [])); + + logger.info( + `[feed][${jobId}] Found ${feedItems.length} entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + if (feedItems.length === 0) { + logger.info(`[feed][${jobId}] No entries found.`); + return; + } + + // For feeds that don't have guids, use the link as the id + feedItems.forEach((item) => { + item.guid = item.guid ?? `${item.id}` ?? item.link; + }); + + const exitingEntries = await db.query.rssFeedImportsTable.findMany({ + where: and( + eq(rssFeedImportsTable.rssFeedId, feed.id), + inArray( + rssFeedImportsTable.entryId, + feedItems.map((item) => item.guid).filter((id): id is string => !!id), + ), + ), + }); + + const newEntries = feedItems.filter( + (item) => + !exitingEntries.some((entry) => entry.entryId === item.guid) && + item.link && + item.guid, + ); + + if (newEntries.length === 0) { + logger.info( + `[feed][${jobId}] No new entries found in feed "${feed.name}" (${feed.id}).`, + ); + return; + } + + logger.info( + `[feed][${jobId}] Found ${newEntries.length} new entries in feed "${feed.name}" (${feed.id}) ...`, + ); + + const trpcClient = await buildImpersonatingTRPCClient(feed.userId); + + const createdBookmarks = await Promise.allSettled( + newEntries.map((item) => + trpcClient.bookmarks.createBookmark({ + type: BookmarkTypes.LINK, + url: item.link!, + }), + ), + ); + + // It's ok if this is not transactional as the bookmarks will get linked in the next iteration. + await db + .insert(rssFeedImportsTable) + .values( + newEntries.map((item, idx) => { + const b = createdBookmarks[idx]; + return { + entryId: item.guid!, + bookmarkId: b.status === "fulfilled" ? b.value.id : null, + rssFeedId: feed.id, + }; + }), + ) + .onConflictDoNothing(); + + logger.info( + `[feed][${jobId}] Successfully imported ${newEntries.length} new enteries from feed "${feed.name}" (${feed.id}).`, + ); + + return Promise.resolve(); +} diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts new file mode 100644 index 00000000..f7492c8b --- /dev/null +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -0,0 +1,100 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClientFactory } from "@karakeep/shared/inference"; +import logger from "@karakeep/shared/logger"; +import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues"; + +import { runSummarization } from "./summarize"; +import { runTagging } from "./tagging"; + +async function attemptMarkStatus( + jobData: object | undefined, + status: "success" | "failure", +) { + if (!jobData) { + return; + } + try { + const request = zOpenAIRequestSchema.parse(jobData); + await db + .update(bookmarks) + .set({ + ...(request.type === "summarize" + ? { summarizationStatus: status } + : {}), + ...(request.type === "tag" ? { taggingStatus: status } : {}), + }) + .where(eq(bookmarks.id, request.bookmarkId)); + } catch (e) { + logger.error(`Something went wrong when marking the tagging status: ${e}`); + } +} + +export class OpenAiWorker { + static build() { + logger.info("Starting inference worker ..."); + const worker = new Runner( + OpenAIQueue, + { + run: runOpenAI, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[inference][${jobId}] Completed successfully`); + await attemptMarkStatus(job.data, "success"); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.numRetriesLeft == 0) { + await attemptMarkStatus(job?.data, "failure"); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: serverConfig.inference.jobTimeoutSec, + }, + ); + + return worker; + } +} + +async function runOpenAI(job: DequeuedJob) { + const jobId = job.id; + + const inferenceClient = InferenceClientFactory.build(); + if (!inferenceClient) { + logger.debug( + `[inference][${jobId}] No inference client configured, nothing to do now`, + ); + return; + } + + const request = zOpenAIRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const { bookmarkId } = request.data; + switch (request.data.type) { + case "summarize": + await runSummarization(bookmarkId, job, inferenceClient); + break; + case "tag": + await runTagging(bookmarkId, job, inferenceClient); + break; + default: + throw new Error(`Unknown inference type: ${request.data.type}`); + } +} diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts new file mode 100644 index 00000000..a832fe0a --- /dev/null +++ b/apps/workers/workers/inference/summarize.ts @@ -0,0 +1,123 @@ +import { and, eq } from "drizzle-orm"; +import { DequeuedJob } from "liteque"; + +import { db } from "@karakeep/db"; +import { bookmarks, customPrompts } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClient } from "@karakeep/shared/inference"; +import logger from "@karakeep/shared/logger"; +import { buildSummaryPrompt } from "@karakeep/shared/prompts"; +import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +async function fetchBookmarkDetailsForSummary(bookmarkId: string) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { id: true, userId: true, type: true }, + with: { + link: { + columns: { + title: true, + description: true, + content: true, + publisher: true, + author: true, + url: true, + }, + }, + // If assets (like PDFs with extracted text) should be summarized, extend here + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark with id ${bookmarkId} not found`); + } + return bookmark; +} + +export async function runSummarization( + bookmarkId: string, + job: DequeuedJob, + inferenceClient: InferenceClient, +) { + if (!serverConfig.inference.enableAutoSummarization) { + logger.info( + `[inference][${job.id}] Skipping summarization job for bookmark with id "${bookmarkId}" because it's disabled in the config.`, + ); + return; + } + const jobId = job.id; + + logger.info( + `[inference][${jobId}] Starting a summary job for bookmark with id "${bookmarkId}"`, + ); + + const bookmarkData = await fetchBookmarkDetailsForSummary(bookmarkId); + + let textToSummarize = ""; + if (bookmarkData.type === BookmarkTypes.LINK && bookmarkData.link) { + const link = bookmarkData.link; + textToSummarize = ` +Title: ${link.title ?? ""} +Description: ${link.description ?? ""} +Content: ${link.content ?? ""} +Publisher: ${link.publisher ?? ""} +Author: ${link.author ?? ""} +URL: ${link.url ?? ""} +`; + } else { + logger.warn( + `[inference][${jobId}] Bookmark ${bookmarkId} (type: ${bookmarkData.type}) is not a LINK or TEXT type with content, or content is missing. Skipping summary.`, + ); + return; + } + + if (!textToSummarize.trim()) { + logger.info( + `[inference][${jobId}] No content to summarize for bookmark ${bookmarkId}.`, + ); + return; + } + + const prompts = await db.query.customPrompts.findMany({ + where: and( + eq(customPrompts.userId, bookmarkData.userId), + eq(customPrompts.appliesTo, "summary"), + ), + columns: { + text: true, + }, + }); + + const summaryPrompt = buildSummaryPrompt( + serverConfig.inference.inferredTagLang, + prompts.map((p) => p.text), + textToSummarize, + serverConfig.inference.contextLength, + ); + + const summaryResult = await inferenceClient.inferFromText(summaryPrompt, { + schema: null, // Summaries are typically free-form text + abortSignal: job.abortSignal, + }); + + if (!summaryResult.response) { + throw new Error( + `[inference][${jobId}] Failed to summarize bookmark ${bookmarkId}, empty response from inference client.`, + ); + } + + logger.info( + `[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`, + ); + + await db + .update(bookmarks) + .set({ + summary: summaryResult.response, + modifiedAt: new Date(), + }) + .where(eq(bookmarks.id, bookmarkId)); + + await triggerSearchReindex(bookmarkId); +} diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts new file mode 100644 index 00000000..35c366c7 --- /dev/null +++ b/apps/workers/workers/inference/tagging.ts @@ -0,0 +1,399 @@ +import { and, Column, eq, inArray, sql } from "drizzle-orm"; +import { DequeuedJob } from "liteque"; +import { buildImpersonatingTRPCClient } from "trpc"; +import { z } from "zod"; + +import type { InferenceClient } from "@karakeep/shared/inference"; +import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { + bookmarks, + bookmarkTags, + customPrompts, + tagsOnBookmarks, +} from "@karakeep/db/schema"; +import { readAsset } from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; +import { + triggerRuleEngineOnEvent, + triggerSearchReindex, + triggerWebhook, +} from "@karakeep/shared/queues"; + +const openAIResponseSchema = z.object({ + tags: z.array(z.string()), +}); + +function tagNormalizer(col: Column) { + function normalizeTag(tag: string) { + return tag.toLowerCase().replace(/[ \-_]/g, ""); + } + + return { + normalizeTag, + sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`, + }; +} +async function buildPrompt( + bookmark: NonNullable>>, +) { + const prompts = await fetchCustomPrompts(bookmark.userId, "text"); + if (bookmark.link) { + if (!bookmark.link.description && !bookmark.link.content) { + throw new Error( + `No content found for link "${bookmark.id}". Skipping ...`, + ); + } + + const content = bookmark.link.content; + return buildTextPrompt( + serverConfig.inference.inferredTagLang, + prompts, + `URL: ${bookmark.link.url} +Title: ${bookmark.link.title ?? ""} +Description: ${bookmark.link.description ?? ""} +Content: ${content ?? ""}`, + serverConfig.inference.contextLength, + ); + } + + if (bookmark.text) { + return buildTextPrompt( + serverConfig.inference.inferredTagLang, + prompts, + bookmark.text.text ?? "", + serverConfig.inference.contextLength, + ); + } + + throw new Error("Unknown bookmark type"); +} + +async function inferTagsFromImage( + jobId: string, + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + const { asset, metadata } = await readAsset({ + userId: bookmark.userId, + assetId: bookmark.asset.assetId, + }); + + if (!asset) { + throw new Error( + `[inference][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmark.id} not found`, + ); + } + + const base64 = asset.toString("base64"); + return inferenceClient.inferFromImage( + buildImagePrompt( + serverConfig.inference.inferredTagLang, + await fetchCustomPrompts(bookmark.userId, "images"), + ), + metadata.contentType, + base64, + { schema: openAIResponseSchema, abortSignal }, + ); +} + +async function fetchCustomPrompts( + userId: string, + appliesTo: "text" | "images", +) { + const prompts = await db.query.customPrompts.findMany({ + where: and( + eq(customPrompts.userId, userId), + inArray(customPrompts.appliesTo, ["all_tagging", appliesTo]), + ), + columns: { + text: true, + }, + }); + + let promptTexts = prompts.map((p) => p.text); + if (containsTagsPlaceholder(prompts)) { + promptTexts = await replaceTagsPlaceholders(promptTexts, userId); + } + + return promptTexts; +} + +async function replaceTagsPlaceholders( + prompts: string[], + userId: string, +): Promise { + const api = await buildImpersonatingTRPCClient(userId); + const tags = (await api.tags.list()).tags; + const tagsString = `[${tags.map((tag) => tag.name).join(", ")}]`; + const aiTagsString = `[${tags + .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 == 0) + .map((tag) => tag.name) + .join(", ")}]`; + const userTagsString = `[${tags + .filter((tag) => tag.numBookmarksByAttachedType.human ?? 0 > 0) + .map((tag) => tag.name) + .join(", ")}]`; + + return prompts.map((p) => + p + .replaceAll("$tags", tagsString) + .replaceAll("$aiTags", aiTagsString) + .replaceAll("$userTags", userTagsString), + ); +} + +function containsTagsPlaceholder(prompts: { text: string }[]): boolean { + return ( + prompts.filter( + (p) => + p.text.includes("$tags") || + p.text.includes("$aiTags") || + p.text.includes("$userTags"), + ).length > 0 + ); +} + +async function inferTagsFromPDF( + _jobId: string, + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + const prompt = buildTextPrompt( + serverConfig.inference.inferredTagLang, + await fetchCustomPrompts(bookmark.userId, "text"), + `Content: ${bookmark.asset.content}`, + serverConfig.inference.contextLength, + ); + return inferenceClient.inferFromText(prompt, { + schema: openAIResponseSchema, + abortSignal, + }); +} + +async function inferTagsFromText( + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + return await inferenceClient.inferFromText(await buildPrompt(bookmark), { + schema: openAIResponseSchema, + abortSignal, + }); +} + +async function inferTags( + jobId: string, + bookmark: NonNullable>>, + inferenceClient: InferenceClient, + abortSignal: AbortSignal, +) { + let response; + if (bookmark.link || bookmark.text) { + response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); + } else if (bookmark.asset) { + switch (bookmark.asset.assetType) { + case "image": + response = await inferTagsFromImage( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); + break; + case "pdf": + response = await inferTagsFromPDF( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); + break; + default: + throw new Error(`[inference][${jobId}] Unsupported bookmark type`); + } + } else { + throw new Error(`[inference][${jobId}] Unsupported bookmark type`); + } + + if (!response) { + throw new Error(`[inference][${jobId}] Inference response is empty`); + } + + try { + let tags = openAIResponseSchema.parse(JSON.parse(response.response)).tags; + logger.info( + `[inference][${jobId}] Inferring tag for bookmark "${bookmark.id}" used ${response.totalTokens} tokens and inferred: ${tags}`, + ); + + // Sometimes the tags contain the hashtag symbol, let's strip them out if they do. + // Additionally, trim the tags to prevent whitespaces at the beginning/the end of the tag. + tags = tags.map((t) => { + let tag = t; + if (tag.startsWith("#")) { + tag = t.slice(1); + } + return tag.trim(); + }); + + return tags; + } catch (e) { + const responseSneak = response.response.substring(0, 20); + throw new Error( + `[inference][${jobId}] The model ignored our prompt and didn't respond with the expected JSON: ${JSON.stringify(e)}. Here's a sneak peak from the response: ${responseSneak}`, + ); + } +} + +async function connectTags( + bookmarkId: string, + inferredTags: string[], + userId: string, +) { + if (inferredTags.length == 0) { + return; + } + + await db.transaction(async (tx) => { + // Attempt to match exiting tags with the new ones + const { matchedTagIds, notFoundTagNames } = await (async () => { + const { normalizeTag, sql: normalizedTagSql } = tagNormalizer( + bookmarkTags.name, + ); + const normalizedInferredTags = inferredTags.map((t) => ({ + originalTag: t, + normalizedTag: normalizeTag(t), + })); + + const matchedTags = await tx.query.bookmarkTags.findMany({ + where: and( + eq(bookmarkTags.userId, userId), + inArray( + normalizedTagSql, + normalizedInferredTags.map((t) => t.normalizedTag), + ), + ), + }); + + const matchedTagIds = matchedTags.map((r) => r.id); + const notFoundTagNames = normalizedInferredTags + .filter( + (t) => + !matchedTags.some( + (mt) => normalizeTag(mt.name) === t.normalizedTag, + ), + ) + .map((t) => t.originalTag); + + return { matchedTagIds, notFoundTagNames }; + })(); + + // Create tags that didn't exist previously + let newTagIds: string[] = []; + if (notFoundTagNames.length > 0) { + newTagIds = ( + await tx + .insert(bookmarkTags) + .values( + notFoundTagNames.map((t) => ({ + name: t, + userId, + })), + ) + .onConflictDoNothing() + .returning() + ).map((t) => t.id); + } + + // Delete old AI tags + const detachedTags = await tx + .delete(tagsOnBookmarks) + .where( + and( + eq(tagsOnBookmarks.attachedBy, "ai"), + eq(tagsOnBookmarks.bookmarkId, bookmarkId), + ), + ) + .returning(); + + const allTagIds = new Set([...matchedTagIds, ...newTagIds]); + + // Attach new ones + const attachedTags = await tx + .insert(tagsOnBookmarks) + .values( + [...allTagIds].map((tagId) => ({ + tagId, + bookmarkId, + attachedBy: "ai" as const, + })), + ) + .onConflictDoNothing() + .returning(); + + await triggerRuleEngineOnEvent(bookmarkId, [ + ...detachedTags.map((t) => ({ + type: "tagRemoved" as const, + tagId: t.tagId, + })), + ...attachedTags.map((t) => ({ + type: "tagAdded" as const, + tagId: t.tagId, + })), + ]); + }); +} + +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + asset: true, + }, + }); +} + +export async function runTagging( + bookmarkId: string, + job: DequeuedJob, + inferenceClient: InferenceClient, +) { + if (!serverConfig.inference.enableAutoTagging) { + logger.info( + `[inference][${job.id}] Skipping tagging job for bookmark with id "${bookmarkId}" because it's disabled in the config.`, + ); + return; + } + const jobId = job.id; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[inference][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + logger.info( + `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, + ); + + const tags = await inferTags( + jobId, + bookmark, + inferenceClient, + job.abortSignal, + ); + + await connectTags(bookmarkId, tags, bookmark.userId); + + // Trigger a webhook + await triggerWebhook(bookmarkId, "ai tagged"); + + // Update the search index + await triggerSearchReindex(bookmarkId); +} diff --git a/apps/workers/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts new file mode 100644 index 00000000..427cc383 --- /dev/null +++ b/apps/workers/workers/ruleEngineWorker.ts @@ -0,0 +1,86 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import { buildImpersonatingAuthedContext } from "trpc"; + +import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { + RuleEngineQueue, + zRuleEngineRequestSchema, +} from "@karakeep/shared/queues"; +import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; + +export class RuleEngineWorker { + static build() { + logger.info("Starting rule engine worker ..."); + const worker = new Runner( + RuleEngineQueue, + { + run: runRuleEngine, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[ruleEngine][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 10, + validator: zRuleEngineRequestSchema, + }, + ); + + return worker; + } +} + +async function getBookmarkUserId(bookmarkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { + userId: true, + }, + }); +} + +async function runRuleEngine(job: DequeuedJob) { + const jobId = job.id; + const { bookmarkId, events } = job.data; + + const bookmark = await getBookmarkUserId(bookmarkId); + if (!bookmark) { + throw new Error( + `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + const userId = bookmark.userId; + const authedCtx = await buildImpersonatingAuthedContext(userId); + + const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); + + const results = ( + await Promise.all(events.map((event) => ruleEngine.onEvent(event))) + ).flat(); + + if (results.length == 0) { + return; + } + + const message = results + .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`) + .join("\n"); + + logger.info( + `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`, + ); +} diff --git a/apps/workers/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts new file mode 100644 index 00000000..e7b827a9 --- /dev/null +++ b/apps/workers/workers/searchWorker.ts @@ -0,0 +1,156 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import type { ZSearchIndexingRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { + SearchIndexingQueue, + zSearchIndexingRequestSchema, +} from "@karakeep/shared/queues"; +import { getSearchIdxClient } from "@karakeep/shared/search"; + +export class SearchIndexingWorker { + static build() { + logger.info("Starting search indexing worker ..."); + const worker = new Runner( + SearchIndexingQueue, + { + run: runSearchIndexing, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[search][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function ensureTaskSuccess( + searchClient: NonNullable>>, + taskUid: number, +) { + const task = await searchClient.waitForTask(taskUid); + if (task.error) { + throw new Error(`Search task failed: ${task.error.message}`); + } +} + +async function runIndex( + searchClient: NonNullable>>, + bookmarkId: string, +) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: true, + text: true, + asset: true, + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark ${bookmarkId} not found`); + } + + const task = await searchClient.addDocuments( + [ + { + id: bookmark.id, + userId: bookmark.userId, + ...(bookmark.link + ? { + url: bookmark.link.url, + linkTitle: bookmark.link.title, + description: bookmark.link.description, + content: bookmark.link.content, + publisher: bookmark.link.publisher, + author: bookmark.link.author, + datePublished: bookmark.link.datePublished, + dateModified: bookmark.link.dateModified, + } + : undefined), + ...(bookmark.asset + ? { + content: bookmark.asset.content, + metadata: bookmark.asset.metadata, + } + : undefined), + ...(bookmark.text ? { content: bookmark.text.text } : undefined), + note: bookmark.note, + summary: bookmark.summary, + title: bookmark.title, + createdAt: bookmark.createdAt.toISOString(), + tags: bookmark.tagsOnBookmarks.map((t) => t.tag.name), + }, + ], + { + primaryKey: "id", + }, + ); + await ensureTaskSuccess(searchClient, task.taskUid); +} + +async function runDelete( + searchClient: NonNullable>>, + bookmarkId: string, +) { + const task = await searchClient.deleteDocument(bookmarkId); + await ensureTaskSuccess(searchClient, task.taskUid); +} + +async function runSearchIndexing(job: DequeuedJob) { + const jobId = job.id; + + const request = zSearchIndexingRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[search][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const searchClient = await getSearchIdxClient(); + if (!searchClient) { + logger.debug( + `[search][${jobId}] Search is not configured, nothing to do now`, + ); + return; + } + + const bookmarkId = request.data.bookmarkId; + logger.info( + `[search][${jobId}] Attempting to index bookmark with id ${bookmarkId} ...`, + ); + + switch (request.data.type) { + case "index": { + await runIndex(searchClient, bookmarkId); + break; + } + case "delete": { + await runDelete(searchClient, bookmarkId); + break; + } + } +} diff --git a/apps/workers/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts new file mode 100644 index 00000000..d4c8abdb --- /dev/null +++ b/apps/workers/workers/tidyAssetsWorker.ts @@ -0,0 +1,107 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import { db } from "@karakeep/db"; +import { assets } from "@karakeep/db/schema"; +import { deleteAsset, getAllAssets } from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { + TidyAssetsQueue, + ZTidyAssetsRequest, + zTidyAssetsRequestSchema, +} from "@karakeep/shared/queues"; + +export class TidyAssetsWorker { + static build() { + logger.info("Starting tidy assets worker ..."); + const worker = new Runner( + TidyAssetsQueue, + { + run: runTidyAssets, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[tidyAssets][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function handleAsset( + asset: { + assetId: string; + userId: string; + size: number; + contentType: string; + fileName?: string | null; + }, + request: ZTidyAssetsRequest, + jobId: string, +) { + const dbRow = await db.query.assets.findFirst({ + where: eq(assets.id, asset.assetId), + }); + if (!dbRow) { + if (request.cleanDanglingAssets) { + await deleteAsset({ userId: asset.userId, assetId: asset.assetId }); + logger.info( + `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Deleting it.`, + ); + } else { + logger.warn( + `[tidyAssets][${jobId}] Asset ${asset.assetId} not found in the database. Not deleting it because cleanDanglingAssets is false.`, + ); + } + return; + } + + if (request.syncAssetMetadata) { + await db + .update(assets) + .set({ + contentType: asset.contentType, + fileName: asset.fileName, + size: asset.size, + }) + .where(eq(assets.id, asset.assetId)); + logger.info( + `[tidyAssets][${jobId}] Updated metadata for asset ${asset.assetId}`, + ); + } +} + +async function runTidyAssets(job: DequeuedJob) { + const jobId = job.id; + + const request = zTidyAssetsRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[tidyAssets][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + for await (const asset of getAllAssets()) { + try { + handleAsset(asset, request.data, jobId); + } catch (e) { + logger.error( + `[tidyAssets][${jobId}] Failed to tidy asset ${asset.assetId}: ${e}`, + ); + } + } +} diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts new file mode 100644 index 00000000..3fa3e49e --- /dev/null +++ b/apps/workers/workers/videoWorker.ts @@ -0,0 +1,214 @@ +import fs from "fs"; +import * as os from "os"; +import path from "path"; +import { execa } from "execa"; +import { DequeuedJob, Runner } from "liteque"; + +import { db } from "@karakeep/db"; +import { AssetTypes } from "@karakeep/db/schema"; +import { + ASSET_TYPES, + getAssetSize, + newAssetId, + saveAssetFromFile, + silentDeleteAsset, +} from "@karakeep/shared/assetdb"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + VideoWorkerQueue, + ZVideoRequest, + zvideoRequestSchema, +} from "@karakeep/shared/queues"; + +import { withTimeout } from "../utils"; +import { getBookmarkDetails, updateAsset } from "../workerUtils"; + +const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads"); + +export class VideoWorker { + static build() { + logger.info("Starting video worker ..."); + + return new Runner( + VideoWorkerQueue, + { + run: withTimeout( + runWorker, + /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout, + ), + onComplete: async (job) => { + const jobId = job.id; + logger.info( + `[VideoCrawler][${jobId}] Video Download Completed successfully`, + ); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`, + ); + return Promise.resolve(); + }, + }, + { + pollIntervalMs: 1000, + timeoutSecs: serverConfig.crawler.downloadVideoTimeout, + concurrency: 1, + validator: zvideoRequestSchema, + }, + ); + } +} + +function prepareYtDlpArguments(url: string, assetPath: string) { + const ytDlpArguments = [url]; + if (serverConfig.crawler.maxVideoDownloadSize > 0) { + ytDlpArguments.push( + "-f", + `best[filesize<${serverConfig.crawler.maxVideoDownloadSize}M]`, + ); + } + + ytDlpArguments.push(...serverConfig.crawler.ytDlpArguments); + ytDlpArguments.push("-o", assetPath); + ytDlpArguments.push("--no-playlist"); + return ytDlpArguments; +} + +async function runWorker(job: DequeuedJob) { + const jobId = job.id; + const { bookmarkId } = job.data; + + const { + url, + userId, + videoAssetId: oldVideoAssetId, + } = await getBookmarkDetails(bookmarkId); + + if (!serverConfig.crawler.downloadVideo) { + logger.info( + `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it is disabled in the config.`, + ); + return; + } + + const videoAssetId = newAssetId(); + let assetPath = `${TMP_FOLDER}/${videoAssetId}`; + await fs.promises.mkdir(TMP_FOLDER, { recursive: true }); + + const ytDlpArguments = prepareYtDlpArguments(url, assetPath); + + try { + logger.info( + `[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`, + ); + + await execa("yt-dlp", ytDlpArguments, { + cancelSignal: job.abortSignal, + }); + const downloadPath = await findAssetFile(videoAssetId); + if (!downloadPath) { + logger.info( + "[VideoCrawler][${jobId}] yt-dlp didn't download anything. Skipping ...", + ); + return; + } + assetPath = downloadPath; + } catch (e) { + const err = e as Error; + if ( + err.message.includes("ERROR: Unsupported URL:") || + err.message.includes("No media found") + ) { + logger.info( + `[VideoCrawler][${jobId}] Skipping video download from "${url}", because it's not one of the supported yt-dlp URLs`, + ); + return; + } + logger.error( + `[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`, + ); + await deleteLeftOverAssetFile(jobId, videoAssetId); + return; + } + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`, + ); + await saveAssetFromFile({ + userId, + assetId: videoAssetId, + assetPath, + metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, + }); + + await db.transaction(async (txn) => { + await updateAsset( + oldVideoAssetId, + { + id: videoAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_VIDEO, + contentType: ASSET_TYPES.VIDEO_MP4, + size: await getAssetSize({ userId, assetId: videoAssetId }), + }, + txn, + ); + }); + await silentDeleteAsset(userId, oldVideoAssetId); + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, + ); +} + +/** + * Deletes leftover assets in case the download fails + * + * @param jobId the id of the job + * @param assetId the id of the asset to delete + */ +async function deleteLeftOverAssetFile( + jobId: string, + assetId: string, +): Promise { + let assetFile; + try { + assetFile = await findAssetFile(assetId); + } catch { + // ignore exception, no asset file was found + return; + } + if (!assetFile) { + return; + } + logger.info( + `[VideoCrawler][${jobId}] Deleting leftover video asset "${assetFile}".`, + ); + try { + await fs.promises.rm(assetFile); + } catch (e) { + logger.error( + `[VideoCrawler][${jobId}] Failed deleting leftover video asset "${assetFile}".`, + ); + } +} + +/** + * yt-dlp automatically adds a file ending to the passed in filename --> we have to search it again in the folder + * + * @param assetId the id of the asset to search + * @returns the path to the downloaded asset + */ +async function findAssetFile(assetId: string): Promise { + const files = await fs.promises.readdir(TMP_FOLDER); + for (const file of files) { + if (file.startsWith(assetId)) { + return path.join(TMP_FOLDER, file); + } + } + return null; +} diff --git a/apps/workers/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts new file mode 100644 index 00000000..9d3ed2c1 --- /dev/null +++ b/apps/workers/workers/webhookWorker.ts @@ -0,0 +1,146 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import fetch from "node-fetch"; + +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; +import { + WebhookQueue, + ZWebhookRequest, + zWebhookRequestSchema, +} from "@karakeep/shared/queues"; + +export class WebhookWorker { + static build() { + logger.info("Starting webhook worker ..."); + const worker = new Runner( + WebhookQueue, + { + run: runWebhook, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[webhook][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[webhook][${jobId}] webhook job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: + serverConfig.webhook.timeoutSec * + (serverConfig.webhook.retryTimes + 1) + + 1, //consider retry times, and timeout and add 1 second for other stuff + validator: zWebhookRequestSchema, + }, + ); + + return worker; + } +} + +async function fetchBookmark(bookmarkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + link: { + columns: { + url: true, + }, + }, + user: { + columns: {}, + with: { + webhooks: true, + }, + }, + }, + }); +} + +async function runWebhook(job: DequeuedJob) { + const jobId = job.id; + const webhookTimeoutSec = serverConfig.webhook.timeoutSec; + + const { bookmarkId } = job.data; + const bookmark = await fetchBookmark(bookmarkId); + if (!bookmark) { + throw new Error( + `[webhook][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + + if (!bookmark.user.webhooks) { + return; + } + + logger.info( + `[webhook][${jobId}] Starting a webhook job for bookmark with id "${bookmark.id} for operation "${job.data.operation}"`, + ); + + await Promise.allSettled( + bookmark.user.webhooks + .filter((w) => w.events.includes(job.data.operation)) + .map(async (webhook) => { + const url = webhook.url; + const webhookToken = webhook.token; + const maxRetries = serverConfig.webhook.retryTimes; + let attempt = 0; + let success = false; + + while (attempt < maxRetries && !success) { + try { + const response = await fetch(url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(webhookToken + ? { + Authorization: `Bearer ${webhookToken}`, + } + : {}), + }, + body: JSON.stringify({ + jobId, + bookmarkId, + userId: bookmark.userId, + url: bookmark.link ? bookmark.link.url : undefined, + type: bookmark.type, + operation: job.data.operation, + }), + signal: AbortSignal.timeout(webhookTimeoutSec * 1000), + }); + + if (!response.ok) { + logger.error( + `Webhook call to ${url} failed with status: ${response.status}`, + ); + } else { + logger.info( + `[webhook][${jobId}] Webhook to ${url} call succeeded`, + ); + success = true; + } + } catch (error) { + logger.error( + `[webhook][${jobId}] Webhook to ${url} call failed: ${error}`, + ); + } + attempt++; + if (!success && attempt < maxRetries) { + logger.info( + `[webhook][${jobId}] Retrying webhook call to ${url}, attempt ${attempt + 1}`, + ); + } + } + }), + ); +} diff --git a/docs/docs/03-configuration.md b/docs/docs/03-configuration.md index c6fa024e..ac8883d4 100644 --- a/docs/docs/03-configuration.md +++ b/docs/docs/03-configuration.md @@ -26,17 +26,17 @@ Only OIDC compliant OAuth providers are supported! For information on how to set When setting up OAuth, the allowed redirect URLs configured at the provider should be set to `/api/auth/callback/custom` where `` is the address you configured in `NEXTAUTH_URL` (for example: `https://try.karakeep.app/api/auth/callback/custom`). ::: -| Name | Required | Default | Description | -| ------------------------------------------- | -------- | ---------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| DISABLE_SIGNUPS | No | false | If enabled, no new signups will be allowed and the signup button will be disabled in the UI | -| DISABLE_PASSWORD_AUTH | No | false | If enabled, only signups and logins using OAuth are allowed and the signup button and login form for local accounts will be disabled in the UI | -| OAUTH_WELLKNOWN_URL | No | Not set | The "wellknown Url" for openid-configuration as provided by the OAuth provider | -| OAUTH_CLIENT_SECRET | No | Not set | The "Client Secret" as provided by the OAuth provider | -| OAUTH_CLIENT_ID | No | Not set | The "Client ID" as provided by the OAuth provider | -| OAUTH_SCOPE | No | "openid email profile" | "Full list of scopes to request (space delimited)" | -| OAUTH_PROVIDER_NAME | No | "Custom Provider" | The name of your provider. Will be shown on the signup page as "Sign in with ``" | +| Name | Required | Default | Description | +| ------------------------------------------- | -------- | ---------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| DISABLE_SIGNUPS | No | false | If enabled, no new signups will be allowed and the signup button will be disabled in the UI | +| DISABLE_PASSWORD_AUTH | No | false | If enabled, only signups and logins using OAuth are allowed and the signup button and login form for local accounts will be disabled in the UI | +| OAUTH_WELLKNOWN_URL | No | Not set | The "wellknown Url" for openid-configuration as provided by the OAuth provider | +| OAUTH_CLIENT_SECRET | No | Not set | The "Client Secret" as provided by the OAuth provider | +| OAUTH_CLIENT_ID | No | Not set | The "Client ID" as provided by the OAuth provider | +| OAUTH_SCOPE | No | "openid email profile" | "Full list of scopes to request (space delimited)" | +| OAUTH_PROVIDER_NAME | No | "Custom Provider" | The name of your provider. Will be shown on the signup page as "Sign in with ``" | | OAUTH_ALLOW_DANGEROUS_EMAIL_ACCOUNT_LINKING | No | false | Whether existing accounts in karakeep stored in the database should automatically be linked with your OAuth account. Only enable it if you trust the OAuth provider! | -| OAUTH_TIMEOUT | No | 3500 | The wait time in milliseconds for the OAuth provider response. Increase this if you are having `outgoing request timed out` errors | +| OAUTH_TIMEOUT | No | 3500 | The wait time in milliseconds for the OAuth provider response. Increase this if you are having `outgoing request timed out` errors | For more information on `OAUTH_ALLOW_DANGEROUS_EMAIL_ACCOUNT_LINKING`, check the [next-auth.js documentation](https://next-auth.js.org/configuration/providers/oauth#allowdangerousemailaccountlinking-option). @@ -61,6 +61,8 @@ Either `OPENAI_API_KEY` or `OLLAMA_BASE_URL` need to be set for automatic taggin | EMBEDDING_TEXT_MODEL | No | text-embedding-3-small | The model to be used for generating embeddings for the text. | | INFERENCE_CONTEXT_LENGTH | No | 2048 | The max number of tokens that we'll pass to the inference model. If your content is larger than this size, it'll be truncated to fit. The larger this value, the more of the content will be used in tag inference, but the more expensive the inference will be (money-wise on openAI and resource-wise on ollama). Check the model you're using for its max supported content size. | | INFERENCE_LANG | No | english | The language in which the tags will be generated. | +| INFERENCE_ENABLE_AUTO_TAGGING | No | true | Whether automatic AI tagging is enabled or disabled. | +| INFERENCE_ENABLE_AUTO_SUMMARIZATION | No | false | Whether automatic AI summarization is enabled or disabled. | | INFERENCE_JOB_TIMEOUT_SEC | No | 30 | How long to wait for the inference job to finish before timing out. If you're running ollama without powerful GPUs, you might want to increase the timeout a bit. | | INFERENCE_FETCH_TIMEOUT_SEC | No | 300 | \[Ollama Only\] The timeout of the fetch request to the ollama server. If your inference requests take longer than the default 5mins, you might want to increase this timeout. | | INFERENCE_SUPPORTS_STRUCTURED_OUTPUT | No | Not set | \[DEPRECATED\] Whether the inference model supports structured output or not. Use INFERENCE_OUTPUT_SCHEMA instead. Setting this to true translates to INFERENCE_OUTPUT_SCHEMA=structured, and to false translates to INFERENCE_OUTPUT_SCHEMA=plain. | diff --git a/packages/db/drizzle/0047_add_summarization_status.sql b/packages/db/drizzle/0047_add_summarization_status.sql new file mode 100644 index 00000000..6e8b40d0 --- /dev/null +++ b/packages/db/drizzle/0047_add_summarization_status.sql @@ -0,0 +1 @@ +ALTER TABLE `bookmarks` ADD `summarizationStatus` text DEFAULT 'pending'; \ No newline at end of file diff --git a/packages/db/drizzle/meta/0047_snapshot.json b/packages/db/drizzle/meta/0047_snapshot.json new file mode 100644 index 00000000..f6908043 --- /dev/null +++ b/packages/db/drizzle/meta/0047_snapshot.json @@ -0,0 +1,1967 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "cd8ff1a8-c7bb-4576-9bec-2644785c894e", + "prevId": "7a950001-03e7-43f2-8ef5-272dfe82d223", + "tables": { + "account": { + "name": "account", + "columns": { + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "provider": { + "name": "provider", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "providerAccountId": { + "name": "providerAccountId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "refresh_token": { + "name": "refresh_token", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "access_token": { + "name": "access_token", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expires_at": { + "name": "expires_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "token_type": { + "name": "token_type", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "scope": { + "name": "scope", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "id_token": { + "name": "id_token", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "session_state": { + "name": "session_state", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "account_userId_user_id_fk": { + "name": "account_userId_user_id_fk", + "tableFrom": "account", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "account_provider_providerAccountId_pk": { + "columns": [ + "provider", + "providerAccountId" + ], + "name": "account_provider_providerAccountId_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "apiKey": { + "name": "apiKey", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "keyId": { + "name": "keyId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "keyHash": { + "name": "keyHash", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "apiKey_keyId_unique": { + "name": "apiKey_keyId_unique", + "columns": [ + "keyId" + ], + "isUnique": true + }, + "apiKey_name_userId_unique": { + "name": "apiKey_name_userId_unique", + "columns": [ + "name", + "userId" + ], + "isUnique": true + } + }, + "foreignKeys": { + "apiKey_userId_user_id_fk": { + "name": "apiKey_userId_user_id_fk", + "tableFrom": "apiKey", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "assets": { + "name": "assets", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "assetType": { + "name": "assetType", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "size": { + "name": "size", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "contentType": { + "name": "contentType", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "fileName": { + "name": "fileName", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "bookmarkId": { + "name": "bookmarkId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "assets_bookmarkId_idx": { + "name": "assets_bookmarkId_idx", + "columns": [ + "bookmarkId" + ], + "isUnique": false + }, + "assets_assetType_idx": { + "name": "assets_assetType_idx", + "columns": [ + "assetType" + ], + "isUnique": false + }, + "assets_userId_idx": { + "name": "assets_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "assets_bookmarkId_bookmarks_id_fk": { + "name": "assets_bookmarkId_bookmarks_id_fk", + "tableFrom": "assets", + "tableTo": "bookmarks", + "columnsFrom": [ + "bookmarkId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "assets_userId_user_id_fk": { + "name": "assets_userId_user_id_fk", + "tableFrom": "assets", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarkAssets": { + "name": "bookmarkAssets", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "assetType": { + "name": "assetType", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "assetId": { + "name": "assetId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "metadata": { + "name": "metadata", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "fileName": { + "name": "fileName", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "sourceUrl": { + "name": "sourceUrl", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "bookmarkAssets_id_bookmarks_id_fk": { + "name": "bookmarkAssets_id_bookmarks_id_fk", + "tableFrom": "bookmarkAssets", + "tableTo": "bookmarks", + "columnsFrom": [ + "id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarkLinks": { + "name": "bookmarkLinks", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "author": { + "name": "author", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "publisher": { + "name": "publisher", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "datePublished": { + "name": "datePublished", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "dateModified": { + "name": "dateModified", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "imageUrl": { + "name": "imageUrl", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "favicon": { + "name": "favicon", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "content": { + "name": "content", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "htmlContent": { + "name": "htmlContent", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "crawledAt": { + "name": "crawledAt", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "crawlStatus": { + "name": "crawlStatus", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'pending'" + }, + "crawlStatusCode": { + "name": "crawlStatusCode", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": 200 + } + }, + "indexes": { + "bookmarkLinks_url_idx": { + "name": "bookmarkLinks_url_idx", + "columns": [ + "url" + ], + "isUnique": false + } + }, + "foreignKeys": { + "bookmarkLinks_id_bookmarks_id_fk": { + "name": "bookmarkLinks_id_bookmarks_id_fk", + "tableFrom": "bookmarkLinks", + "tableTo": "bookmarks", + "columnsFrom": [ + "id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarkLists": { + "name": "bookmarkLists", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "icon": { + "name": "icon", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "query": { + "name": "query", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "parentId": { + "name": "parentId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "bookmarkLists_userId_idx": { + "name": "bookmarkLists_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + }, + "bookmarkLists_userId_id_idx": { + "name": "bookmarkLists_userId_id_idx", + "columns": [ + "userId", + "id" + ], + "isUnique": true + } + }, + "foreignKeys": { + "bookmarkLists_userId_user_id_fk": { + "name": "bookmarkLists_userId_user_id_fk", + "tableFrom": "bookmarkLists", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "bookmarkLists_parentId_bookmarkLists_id_fk": { + "name": "bookmarkLists_parentId_bookmarkLists_id_fk", + "tableFrom": "bookmarkLists", + "tableTo": "bookmarkLists", + "columnsFrom": [ + "parentId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarkTags": { + "name": "bookmarkTags", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "bookmarkTags_name_idx": { + "name": "bookmarkTags_name_idx", + "columns": [ + "name" + ], + "isUnique": false + }, + "bookmarkTags_userId_idx": { + "name": "bookmarkTags_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + }, + "bookmarkTags_userId_name_unique": { + "name": "bookmarkTags_userId_name_unique", + "columns": [ + "userId", + "name" + ], + "isUnique": true + }, + "bookmarkTags_userId_id_idx": { + "name": "bookmarkTags_userId_id_idx", + "columns": [ + "userId", + "id" + ], + "isUnique": true + } + }, + "foreignKeys": { + "bookmarkTags_userId_user_id_fk": { + "name": "bookmarkTags_userId_user_id_fk", + "tableFrom": "bookmarkTags", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarkTexts": { + "name": "bookmarkTexts", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "text": { + "name": "text", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "sourceUrl": { + "name": "sourceUrl", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "bookmarkTexts_id_bookmarks_id_fk": { + "name": "bookmarkTexts_id_bookmarks_id_fk", + "tableFrom": "bookmarkTexts", + "tableTo": "bookmarks", + "columnsFrom": [ + "id" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarks": { + "name": "bookmarks", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "modifiedAt": { + "name": "modifiedAt", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "title": { + "name": "title", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "archived": { + "name": "archived", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "favourited": { + "name": "favourited", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "taggingStatus": { + "name": "taggingStatus", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'pending'" + }, + "summarizationStatus": { + "name": "summarizationStatus", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'pending'" + }, + "summary": { + "name": "summary", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "note": { + "name": "note", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "type": { + "name": "type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "bookmarks_userId_idx": { + "name": "bookmarks_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + }, + "bookmarks_archived_idx": { + "name": "bookmarks_archived_idx", + "columns": [ + "archived" + ], + "isUnique": false + }, + "bookmarks_favourited_idx": { + "name": "bookmarks_favourited_idx", + "columns": [ + "favourited" + ], + "isUnique": false + }, + "bookmarks_createdAt_idx": { + "name": "bookmarks_createdAt_idx", + "columns": [ + "createdAt" + ], + "isUnique": false + } + }, + "foreignKeys": { + "bookmarks_userId_user_id_fk": { + "name": "bookmarks_userId_user_id_fk", + "tableFrom": "bookmarks", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "bookmarksInLists": { + "name": "bookmarksInLists", + "columns": { + "bookmarkId": { + "name": "bookmarkId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "listId": { + "name": "listId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "addedAt": { + "name": "addedAt", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "bookmarksInLists_bookmarkId_idx": { + "name": "bookmarksInLists_bookmarkId_idx", + "columns": [ + "bookmarkId" + ], + "isUnique": false + }, + "bookmarksInLists_listId_idx": { + "name": "bookmarksInLists_listId_idx", + "columns": [ + "listId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "bookmarksInLists_bookmarkId_bookmarks_id_fk": { + "name": "bookmarksInLists_bookmarkId_bookmarks_id_fk", + "tableFrom": "bookmarksInLists", + "tableTo": "bookmarks", + "columnsFrom": [ + "bookmarkId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "bookmarksInLists_listId_bookmarkLists_id_fk": { + "name": "bookmarksInLists_listId_bookmarkLists_id_fk", + "tableFrom": "bookmarksInLists", + "tableTo": "bookmarkLists", + "columnsFrom": [ + "listId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "bookmarksInLists_bookmarkId_listId_pk": { + "columns": [ + "bookmarkId", + "listId" + ], + "name": "bookmarksInLists_bookmarkId_listId_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "config": { + "name": "config", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "customPrompts": { + "name": "customPrompts", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "text": { + "name": "text", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "appliesTo": { + "name": "appliesTo", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "customPrompts_userId_idx": { + "name": "customPrompts_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "customPrompts_userId_user_id_fk": { + "name": "customPrompts_userId_user_id_fk", + "tableFrom": "customPrompts", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "highlights": { + "name": "highlights", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "bookmarkId": { + "name": "bookmarkId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "startOffset": { + "name": "startOffset", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "endOffset": { + "name": "endOffset", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "color": { + "name": "color", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "'yellow'" + }, + "text": { + "name": "text", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "note": { + "name": "note", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "highlights_bookmarkId_idx": { + "name": "highlights_bookmarkId_idx", + "columns": [ + "bookmarkId" + ], + "isUnique": false + }, + "highlights_userId_idx": { + "name": "highlights_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "highlights_bookmarkId_bookmarks_id_fk": { + "name": "highlights_bookmarkId_bookmarks_id_fk", + "tableFrom": "highlights", + "tableTo": "bookmarks", + "columnsFrom": [ + "bookmarkId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "highlights_userId_user_id_fk": { + "name": "highlights_userId_user_id_fk", + "tableFrom": "highlights", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "rssFeedImports": { + "name": "rssFeedImports", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "entryId": { + "name": "entryId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "rssFeedId": { + "name": "rssFeedId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "bookmarkId": { + "name": "bookmarkId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "rssFeedImports_feedIdIdx_idx": { + "name": "rssFeedImports_feedIdIdx_idx", + "columns": [ + "rssFeedId" + ], + "isUnique": false + }, + "rssFeedImports_entryIdIdx_idx": { + "name": "rssFeedImports_entryIdIdx_idx", + "columns": [ + "entryId" + ], + "isUnique": false + }, + "rssFeedImports_rssFeedId_entryId_unique": { + "name": "rssFeedImports_rssFeedId_entryId_unique", + "columns": [ + "rssFeedId", + "entryId" + ], + "isUnique": true + } + }, + "foreignKeys": { + "rssFeedImports_rssFeedId_rssFeeds_id_fk": { + "name": "rssFeedImports_rssFeedId_rssFeeds_id_fk", + "tableFrom": "rssFeedImports", + "tableTo": "rssFeeds", + "columnsFrom": [ + "rssFeedId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "rssFeedImports_bookmarkId_bookmarks_id_fk": { + "name": "rssFeedImports_bookmarkId_bookmarks_id_fk", + "tableFrom": "rssFeedImports", + "tableTo": "bookmarks", + "columnsFrom": [ + "bookmarkId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "rssFeeds": { + "name": "rssFeeds", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": true + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "lastFetchedAt": { + "name": "lastFetchedAt", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "lastFetchedStatus": { + "name": "lastFetchedStatus", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'pending'" + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "rssFeeds_userId_idx": { + "name": "rssFeeds_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "rssFeeds_userId_user_id_fk": { + "name": "rssFeeds_userId_user_id_fk", + "tableFrom": "rssFeeds", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "ruleEngineActions": { + "name": "ruleEngineActions", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "ruleId": { + "name": "ruleId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "action": { + "name": "action", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "listId": { + "name": "listId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "tagId": { + "name": "tagId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "ruleEngineActions_userId_idx": { + "name": "ruleEngineActions_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + }, + "ruleEngineActions_ruleId_idx": { + "name": "ruleEngineActions_ruleId_idx", + "columns": [ + "ruleId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "ruleEngineActions_userId_user_id_fk": { + "name": "ruleEngineActions_userId_user_id_fk", + "tableFrom": "ruleEngineActions", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "ruleEngineActions_ruleId_ruleEngineRules_id_fk": { + "name": "ruleEngineActions_ruleId_ruleEngineRules_id_fk", + "tableFrom": "ruleEngineActions", + "tableTo": "ruleEngineRules", + "columnsFrom": [ + "ruleId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "ruleEngineActions_userId_tagId_fk": { + "name": "ruleEngineActions_userId_tagId_fk", + "tableFrom": "ruleEngineActions", + "tableTo": "bookmarkTags", + "columnsFrom": [ + "userId", + "tagId" + ], + "columnsTo": [ + "userId", + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "ruleEngineActions_userId_listId_fk": { + "name": "ruleEngineActions_userId_listId_fk", + "tableFrom": "ruleEngineActions", + "tableTo": "bookmarkLists", + "columnsFrom": [ + "userId", + "listId" + ], + "columnsTo": [ + "userId", + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "ruleEngineRules": { + "name": "ruleEngineRules", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "enabled": { + "name": "enabled", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": true + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "description": { + "name": "description", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "event": { + "name": "event", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "condition": { + "name": "condition", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "listId": { + "name": "listId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "tagId": { + "name": "tagId", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "ruleEngine_userId_idx": { + "name": "ruleEngine_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "ruleEngineRules_userId_user_id_fk": { + "name": "ruleEngineRules_userId_user_id_fk", + "tableFrom": "ruleEngineRules", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "ruleEngineRules_userId_tagId_fk": { + "name": "ruleEngineRules_userId_tagId_fk", + "tableFrom": "ruleEngineRules", + "tableTo": "bookmarkTags", + "columnsFrom": [ + "userId", + "tagId" + ], + "columnsTo": [ + "userId", + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "ruleEngineRules_userId_listId_fk": { + "name": "ruleEngineRules_userId_listId_fk", + "tableFrom": "ruleEngineRules", + "tableTo": "bookmarkLists", + "columnsFrom": [ + "userId", + "listId" + ], + "columnsTo": [ + "userId", + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "session": { + "name": "session", + "columns": { + "sessionToken": { + "name": "sessionToken", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expires": { + "name": "expires", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": { + "session_userId_user_id_fk": { + "name": "session_userId_user_id_fk", + "tableFrom": "session", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "tagsOnBookmarks": { + "name": "tagsOnBookmarks", + "columns": { + "bookmarkId": { + "name": "bookmarkId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "tagId": { + "name": "tagId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "attachedAt": { + "name": "attachedAt", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "attachedBy": { + "name": "attachedBy", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": { + "tagsOnBookmarks_tagId_idx": { + "name": "tagsOnBookmarks_tagId_idx", + "columns": [ + "tagId" + ], + "isUnique": false + }, + "tagsOnBookmarks_bookmarkId_idx": { + "name": "tagsOnBookmarks_bookmarkId_idx", + "columns": [ + "bookmarkId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "tagsOnBookmarks_bookmarkId_bookmarks_id_fk": { + "name": "tagsOnBookmarks_bookmarkId_bookmarks_id_fk", + "tableFrom": "tagsOnBookmarks", + "tableTo": "bookmarks", + "columnsFrom": [ + "bookmarkId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "tagsOnBookmarks_tagId_bookmarkTags_id_fk": { + "name": "tagsOnBookmarks_tagId_bookmarkTags_id_fk", + "tableFrom": "tagsOnBookmarks", + "tableTo": "bookmarkTags", + "columnsFrom": [ + "tagId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "tagsOnBookmarks_bookmarkId_tagId_pk": { + "columns": [ + "bookmarkId", + "tagId" + ], + "name": "tagsOnBookmarks_bookmarkId_tagId_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "user": { + "name": "user", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "name": { + "name": "name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "email": { + "name": "email", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "emailVerified": { + "name": "emailVerified", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "image": { + "name": "image", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "password": { + "name": "password", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "salt": { + "name": "salt", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": "''" + }, + "role": { + "name": "role", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false, + "default": "'user'" + } + }, + "indexes": { + "user_email_unique": { + "name": "user_email_unique", + "columns": [ + "email" + ], + "isUnique": true + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "verificationToken": { + "name": "verificationToken", + "columns": { + "identifier": { + "name": "identifier", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expires": { + "name": "expires", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": { + "verificationToken_identifier_token_pk": { + "columns": [ + "identifier", + "token" + ], + "name": "verificationToken_identifier_token_pk" + } + }, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "webhooks": { + "name": "webhooks", + "columns": { + "id": { + "name": "id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "createdAt": { + "name": "createdAt", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "url": { + "name": "url", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "userId": { + "name": "userId", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "events": { + "name": "events", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "token": { + "name": "token", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "webhooks_userId_idx": { + "name": "webhooks_userId_idx", + "columns": [ + "userId" + ], + "isUnique": false + } + }, + "foreignKeys": { + "webhooks_userId_user_id_fk": { + "name": "webhooks_userId_user_id_fk", + "tableFrom": "webhooks", + "tableTo": "user", + "columnsFrom": [ + "userId" + ], + "columnsTo": [ + "id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} \ No newline at end of file diff --git a/packages/db/drizzle/meta/_journal.json b/packages/db/drizzle/meta/_journal.json index 9e89f54c..0d7870de 100644 --- a/packages/db/drizzle/meta/_journal.json +++ b/packages/db/drizzle/meta/_journal.json @@ -330,6 +330,13 @@ "when": 1746902541511, "tag": "0046_add_rss_feed_enabled_col", "breakpoints": true + }, + { + "idx": 47, + "version": "6", + "when": 1747598543992, + "tag": "0047_add_summarization_status", + "breakpoints": true } ] } \ No newline at end of file diff --git a/packages/db/schema.ts b/packages/db/schema.ts index 6cfca10c..b5938989 100644 --- a/packages/db/schema.ts +++ b/packages/db/schema.ts @@ -123,6 +123,9 @@ export const bookmarks = sqliteTable( taggingStatus: text("taggingStatus", { enum: ["pending", "failure", "success"], }).default("pending"), + summarizationStatus: text("summarizationStatus", { + enum: ["pending", "failure", "success"], + }).default("pending"), summary: text("summary"), note: text("note"), type: text("type", { diff --git a/packages/shared-react/utils/bookmarkUtils.ts b/packages/shared-react/utils/bookmarkUtils.ts index 1f840f78..08a6a5e9 100644 --- a/packages/shared-react/utils/bookmarkUtils.ts +++ b/packages/shared-react/utils/bookmarkUtils.ts @@ -35,8 +35,19 @@ export function isBookmarkStillTagging(bookmark: ZBookmark) { ); } +export function isBookmarkStillSummarizing(bookmark: ZBookmark) { + return ( + bookmark.summarizationStatus == "pending" && + Date.now().valueOf() - bookmark.createdAt.valueOf() < MAX_LOADING_MSEC + ); +} + export function isBookmarkStillLoading(bookmark: ZBookmark) { - return isBookmarkStillTagging(bookmark) || isBookmarkStillCrawling(bookmark); + return ( + isBookmarkStillTagging(bookmark) || + isBookmarkStillCrawling(bookmark) || + isBookmarkStillSummarizing(bookmark) + ); } export function getSourceUrl(bookmark: ZBookmark) { diff --git a/packages/shared/config.ts b/packages/shared/config.ts index 046583c6..a4548348 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -40,6 +40,8 @@ const allEnv = z.object({ INFERENCE_OUTPUT_SCHEMA: z .enum(["structured", "json", "plain"]) .default("structured"), + INFERENCE_ENABLE_AUTO_TAGGING: stringBool("true"), + INFERENCE_ENABLE_AUTO_SUMMARIZATION: stringBool("false"), OCR_CACHE_DIR: z.string().optional(), OCR_LANGS: z .string() @@ -120,6 +122,8 @@ const serverConfigSchema = allEnv.transform((val) => { ? ("structured" as const) : ("plain" as const) : val.INFERENCE_OUTPUT_SCHEMA, + enableAutoTagging: val.INFERENCE_ENABLE_AUTO_TAGGING, + enableAutoSummarization: val.INFERENCE_ENABLE_AUTO_SUMMARIZATION, }, embedding: { textModel: val.EMBEDDING_TEXT_MODEL, diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index 571df568..bbf69428 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -32,9 +32,10 @@ export const LinkCrawlerQueue = new SqliteQueue( }, ); -// OpenAI Worker +// Inference Worker export const zOpenAIRequestSchema = z.object({ bookmarkId: z.string(), + type: z.enum(["summarize", "tag"]).default("tag"), }); export type ZOpenAIRequest = z.infer; @@ -195,7 +196,7 @@ export async function triggerWebhook( }); } -// RuleEgine worker +// RuleEngine worker export const zRuleEngineRequestSchema = z.object({ bookmarkId: z.string(), events: z.array(zRuleEngineEventSchema), diff --git a/packages/shared/types/bookmarks.ts b/packages/shared/types/bookmarks.ts index 709fd431..5fe77278 100644 --- a/packages/shared/types/bookmarks.ts +++ b/packages/shared/types/bookmarks.ts @@ -87,6 +87,7 @@ export const zBareBookmarkSchema = z.object({ archived: z.boolean(), favourited: z.boolean(), taggingStatus: z.enum(["success", "failure", "pending"]).nullable(), + summarizationStatus: z.enum(["success", "failure", "pending"]).nullable(), note: z.string().nullish(), summary: z.string().nullish(), }); diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts index e4985b5c..91f4a34f 100644 --- a/packages/trpc/routers/admin.ts +++ b/packages/trpc/routers/admin.ts @@ -1,5 +1,5 @@ import { TRPCError } from "@trpc/server"; -import { count, eq, sum } from "drizzle-orm"; +import { count, eq, or, sum } from "drizzle-orm"; import { z } from "zod"; import { assets, bookmarkLinks, bookmarks, users } from "@karakeep/db/schema"; @@ -129,11 +129,21 @@ export const adminAppRouter = router({ ctx.db .select({ value: count() }) .from(bookmarks) - .where(eq(bookmarks.taggingStatus, "pending")), + .where( + or( + eq(bookmarks.taggingStatus, "pending"), + eq(bookmarks.summarizationStatus, "pending"), + ), + ), ctx.db .select({ value: count() }) .from(bookmarks) - .where(eq(bookmarks.taggingStatus, "failure")), + .where( + or( + eq(bookmarks.taggingStatus, "failure"), + eq(bookmarks.summarizationStatus, "failure"), + ), + ), // Tidy Assets TidyAssetsQueue.stats(), @@ -233,7 +243,8 @@ export const adminAppRouter = router({ reRunInferenceOnAllBookmarks: adminProcedure .input( z.object({ - taggingStatus: z.enum(["success", "failure", "all"]), + type: z.enum(["tag", "summarize"]), + status: z.enum(["success", "failure", "all"]), }), ) .mutation(async ({ input, ctx }) => { @@ -241,13 +252,22 @@ export const adminAppRouter = router({ columns: { id: true, }, - ...(input.taggingStatus === "all" - ? {} - : { where: eq(bookmarks.taggingStatus, input.taggingStatus) }), + ...{ + tag: + input.status === "all" + ? {} + : { where: eq(bookmarks.taggingStatus, input.status) }, + summarize: + input.status === "all" + ? {} + : { where: eq(bookmarks.summarizationStatus, input.status) }, + }[input.type], }); await Promise.all( - bookmarkIds.map((b) => OpenAIQueue.enqueue({ bookmarkId: b.id })), + bookmarkIds.map((b) => + OpenAIQueue.enqueue({ bookmarkId: b.id, type: input.type }), + ), ); }), tidyAssets: adminProcedure.mutation(async () => { diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 88386657..de5bd4c2 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -420,6 +420,7 @@ export const bookmarksAppRouter = router({ case BookmarkTypes.TEXT: { await OpenAIQueue.enqueue({ bookmarkId: bookmark.id, + type: "tag", }); break; } -- cgit v1.3-1-g0d28