From 2743d9e38ecfdbf757d4d2f97bcf09d601245b59 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 18 May 2025 20:22:59 +0000 Subject: feat: Add AI auto summarization. Fixes #1163 --- apps/workers/assetPreprocessingWorker.ts | 338 ------------------------------- 1 file changed, 338 deletions(-) delete mode 100644 apps/workers/assetPreprocessingWorker.ts (limited to 'apps/workers/assetPreprocessingWorker.ts') diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/assetPreprocessingWorker.ts deleted file mode 100644 index a678b706..00000000 --- a/apps/workers/assetPreprocessingWorker.ts +++ /dev/null @@ -1,338 +0,0 @@ -import os from "os"; -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import PDFParser from "pdf2json"; -import { fromBuffer } from "pdf2pic"; -import { createWorker } from "tesseract.js"; - -import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - assets, - AssetTypes, - bookmarkAssets, - bookmarks, -} from "@karakeep/db/schema"; -import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - AssetPreprocessingQueue, - OpenAIQueue, - triggerSearchReindex, -} from "@karakeep/shared/queues"; - -export class AssetPreprocessingWorker { - static build() { - logger.info("Starting asset preprocessing worker ..."); - const worker = new Runner( - AssetPreprocessingQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function readImageText(buffer: Buffer) { - if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { - return null; - } - const worker = await createWorker(serverConfig.ocr.langs, undefined, { - cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), - }); - try { - const ret = await worker.recognize(buffer); - if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { - return null; - } - return ret.data.text; - } finally { - await worker.terminate(); - } -} - -async function readPDFText(buffer: Buffer): Promise<{ - text: string; - metadata: Record; -}> { - return new Promise((resolve, reject) => { - const pdfParser = new PDFParser(null, true); - pdfParser.on("pdfParser_dataError", reject); - pdfParser.on("pdfParser_dataReady", (pdfData) => { - resolve({ - text: pdfParser.getRawTextContent(), - metadata: pdfData.Meta, - }); - }); - pdfParser.parseBuffer(buffer); - }); -} - -export async function extractAndSavePDFScreenshot( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasScreenshot = - bookmark.assets.find( - (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, - ) !== undefined; - if (alreadyHasScreenshot && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, - ); - try { - /** - * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL - * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md - */ - const screenshot = await fromBuffer(asset, { - density: 100, - quality: 100, - format: "png", - preserveAspectRatio: true, - })(1, { responseType: "buffer" }); - - if (!screenshot.buffer) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, - ); - return false; - } - - // Store the screenshot - const assetId = newAssetId(); - const fileName = "screenshot.png"; - const contentType = "image/png"; - await saveAsset({ - userId: bookmark.userId, - assetId, - asset: screenshot.buffer, - metadata: { - contentType, - fileName, - }, - }); - - // Insert into database - await db.insert(assets).values({ - id: assetId, - bookmarkId: bookmark.id, - userId: bookmark.userId, - assetType: AssetTypes.ASSET_SCREENSHOT, - contentType, - size: screenshot.buffer.byteLength, - fileName, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, - ); - return true; - } catch (error) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, - ); - return false; - } -} - -async function extractAndSaveImageText( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, - ); - return false; - } - } - let imageText = null; - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, - ); - try { - imageText = await readImageText(asset); - } catch (e) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, - ); - } - if (!imageText) { - return false; - } - - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: imageText, - metadata: null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function extractAndSavePDFText( - jobId: string, - asset: Buffer, - bookmark: NonNullable>>, - isFixMode: boolean, -): Promise { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, - ); - const pdfParse = await readPDFText(asset); - if (!pdfParse?.text) { - throw new Error( - `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, - ); - } - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: pdfParse.text, - metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function getBookmark(bookmarkId: string) { - return db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); -} - -async function run(req: DequeuedJob) { - const isFixMode = req.data.fixMode; - const jobId = req.id; - const bookmarkId = req.data.bookmarkId; - - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, - ); - - if (!bookmark) { - throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); - } - - if (!bookmark.asset) { - throw new Error( - `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, - ); - } - - const { asset } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - - if (!asset) { - throw new Error( - `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, - ); - } - - let anythingChanged = false; - switch (bookmark.asset.assetType) { - case "image": { - const extractedText = await extractAndSaveImageText( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText; - break; - } - case "pdf": { - const extractedText = await extractAndSavePDFText( - jobId, - asset, - bookmark, - isFixMode, - ); - const extractedScreenshot = await extractAndSavePDFScreenshot( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText || extractedScreenshot; - break; - } - default: - throw new Error( - `[assetPreprocessing][${jobId}] Unsupported bookmark type`, - ); - } - - if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - - // Update the search index - await triggerSearchReindex(bookmarkId); - } -} -- cgit v1.2.3-70-g09d2