diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-05-18 20:22:59 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-05-18 20:22:59 +0000 |
| commit | 2743d9e38ecfdbf757d4d2f97bcf09d601245b59 (patch) | |
| tree | 3119581aafce5321aaba9719ba3b2597d000d564 /apps/workers/assetPreprocessingWorker.ts | |
| parent | a5ae67c241d8cdd452acd4d98800ec61740c041f (diff) | |
| download | karakeep-2743d9e38ecfdbf757d4d2f97bcf09d601245b59.tar.zst | |
feat: Add AI auto summarization. Fixes #1163
Diffstat (limited to 'apps/workers/assetPreprocessingWorker.ts')
| -rw-r--r-- | apps/workers/assetPreprocessingWorker.ts | 338 |
1 files changed, 0 insertions, 338 deletions
diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/assetPreprocessingWorker.ts deleted file mode 100644 index a678b706..00000000 --- a/apps/workers/assetPreprocessingWorker.ts +++ /dev/null @@ -1,338 +0,0 @@ -import os from "os"; -import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; -import PDFParser from "pdf2json"; -import { fromBuffer } from "pdf2pic"; -import { createWorker } from "tesseract.js"; - -import type { AssetPreprocessingRequest } from "@karakeep/shared/queues"; -import { db } from "@karakeep/db"; -import { - assets, - AssetTypes, - bookmarkAssets, - bookmarks, -} from "@karakeep/db/schema"; -import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb"; -import serverConfig from "@karakeep/shared/config"; -import logger from "@karakeep/shared/logger"; -import { - AssetPreprocessingQueue, - OpenAIQueue, - triggerSearchReindex, -} from "@karakeep/shared/queues"; - -export class AssetPreprocessingWorker { - static build() { - logger.info("Starting asset preprocessing worker ..."); - const worker = new Runner<AssetPreprocessingRequest>( - AssetPreprocessingQueue, - { - run: run, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); - return Promise.resolve(); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, - ); - return Promise.resolve(); - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: 30, - }, - ); - - return worker; - } -} - -async function readImageText(buffer: Buffer) { - if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { - return null; - } - const worker = await createWorker(serverConfig.ocr.langs, undefined, { - cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), - }); - try { - const ret = await worker.recognize(buffer); - if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { - return null; - } - return ret.data.text; - } finally { - await worker.terminate(); - } -} - -async function readPDFText(buffer: Buffer): Promise<{ - text: string; - metadata: Record<string, object>; -}> { - return new Promise((resolve, reject) => { - const pdfParser = new PDFParser(null, true); - pdfParser.on("pdfParser_dataError", reject); - pdfParser.on("pdfParser_dataReady", (pdfData) => { - resolve({ - text: pdfParser.getRawTextContent(), - metadata: pdfData.Meta, - }); - }); - pdfParser.parseBuffer(buffer); - }); -} - -export async function extractAndSavePDFScreenshot( - jobId: string, - asset: Buffer, - bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>, - isFixMode: boolean, -): Promise<boolean> { - { - const alreadyHasScreenshot = - bookmark.assets.find( - (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, - ) !== undefined; - if (alreadyHasScreenshot && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, - ); - try { - /** - * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL - * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md - */ - const screenshot = await fromBuffer(asset, { - density: 100, - quality: 100, - format: "png", - preserveAspectRatio: true, - })(1, { responseType: "buffer" }); - - if (!screenshot.buffer) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, - ); - return false; - } - - // Store the screenshot - const assetId = newAssetId(); - const fileName = "screenshot.png"; - const contentType = "image/png"; - await saveAsset({ - userId: bookmark.userId, - assetId, - asset: screenshot.buffer, - metadata: { - contentType, - fileName, - }, - }); - - // Insert into database - await db.insert(assets).values({ - id: assetId, - bookmarkId: bookmark.id, - userId: bookmark.userId, - assetType: AssetTypes.ASSET_SCREENSHOT, - contentType, - size: screenshot.buffer.byteLength, - fileName, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, - ); - return true; - } catch (error) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, - ); - return false; - } -} - -async function extractAndSaveImageText( - jobId: string, - asset: Buffer, - bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>, - isFixMode: boolean, -): Promise<boolean> { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, - ); - return false; - } - } - let imageText = null; - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, - ); - try { - imageText = await readImageText(asset); - } catch (e) { - logger.error( - `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, - ); - } - if (!imageText) { - return false; - } - - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: imageText, - metadata: null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function extractAndSavePDFText( - jobId: string, - asset: Buffer, - bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>, - isFixMode: boolean, -): Promise<boolean> { - { - const alreadyHasText = !!bookmark.asset.content; - if (alreadyHasText && isFixMode) { - logger.info( - `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, - ); - return false; - } - } - logger.info( - `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, - ); - const pdfParse = await readPDFText(asset); - if (!pdfParse?.text) { - throw new Error( - `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, - ); - } - logger.info( - `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: pdfParse.text, - metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - return true; -} - -async function getBookmark(bookmarkId: string) { - return db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); -} - -async function run(req: DequeuedJob<AssetPreprocessingRequest>) { - const isFixMode = req.data.fixMode; - const jobId = req.id; - const bookmarkId = req.data.bookmarkId; - - const bookmark = await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, bookmarkId), - with: { - asset: true, - assets: true, - }, - }); - - logger.info( - `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, - ); - - if (!bookmark) { - throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); - } - - if (!bookmark.asset) { - throw new Error( - `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, - ); - } - - const { asset } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - - if (!asset) { - throw new Error( - `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, - ); - } - - let anythingChanged = false; - switch (bookmark.asset.assetType) { - case "image": { - const extractedText = await extractAndSaveImageText( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText; - break; - } - case "pdf": { - const extractedText = await extractAndSavePDFText( - jobId, - asset, - bookmark, - isFixMode, - ); - const extractedScreenshot = await extractAndSavePDFScreenshot( - jobId, - asset, - bookmark, - isFixMode, - ); - anythingChanged ||= extractedText || extractedScreenshot; - break; - } - default: - throw new Error( - `[assetPreprocessing][${jobId}] Unsupported bookmark type`, - ); - } - - if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - - // Update the search index - await triggerSearchReindex(bookmarkId); - } -} |
