import os from "os"; import { eq } from "drizzle-orm"; import { DequeuedJob, Runner } from "liteque"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; import type { AssetPreprocessingRequest } from "@hoarder/shared/queues"; import { db } from "@hoarder/db"; import { assets, AssetTypes, bookmarkAssets, bookmarks, } from "@hoarder/db/schema"; import { newAssetId, readAsset, saveAsset } from "@hoarder/shared/assetdb"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; import { AssetPreprocessingQueue, OpenAIQueue, triggerSearchReindex, } from "@hoarder/shared/queues"; export class AssetPreprocessingWorker { static build() { logger.info("Starting asset preprocessing worker ..."); const worker = new Runner( AssetPreprocessingQueue, { run: run, onComplete: async (job) => { const jobId = job.id; logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: async (job) => { const jobId = job.id; logger.error( `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, ); return Promise.resolve(); }, }, { concurrency: 1, pollIntervalMs: 1000, timeoutSecs: 30, }, ); return worker; } } async function readImageText(buffer: Buffer) { if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { return null; } const worker = await createWorker(serverConfig.ocr.langs, undefined, { cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), }); try { const ret = await worker.recognize(buffer); if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { return null; } return ret.data.text; } finally { await worker.terminate(); } } async function readPDFText(buffer: Buffer): Promise<{ text: string; metadata: Record; }> { return new Promise((resolve, reject) => { const pdfParser = new PDFParser(null, true); pdfParser.on("pdfParser_dataError", reject); pdfParser.on("pdfParser_dataReady", (pdfData) => { resolve({ text: pdfParser.getRawTextContent(), metadata: pdfData.Meta, }); }); pdfParser.parseBuffer(buffer); }); } export async function extractAndSavePDFScreenshot( jobId: string, asset: Buffer, bookmark: NonNullable>>, isFixMode: boolean, ): Promise { { const alreadyHasScreenshot = bookmark.assets.find( (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT, ) !== undefined; if (alreadyHasScreenshot && isFixMode) { logger.info( `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`, ); return false; } } logger.info( `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`, ); try { /** * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md */ const screenshot = await fromBuffer(asset, { density: 100, quality: 100, format: "png", preserveAspectRatio: true, })(1, { responseType: "buffer" }); if (!screenshot.buffer) { logger.error( `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`, ); return false; } // Store the screenshot const assetId = newAssetId(); const fileName = "screenshot.png"; const contentType = "image/png"; await saveAsset({ userId: bookmark.userId, assetId, asset: screenshot.buffer, metadata: { contentType, fileName, }, }); // Insert into database await db.insert(assets).values({ id: assetId, bookmarkId: bookmark.id, userId: bookmark.userId, assetType: AssetTypes.ASSET_SCREENSHOT, contentType, size: screenshot.buffer.byteLength, fileName, }); logger.info( `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`, ); return true; } catch (error) { logger.error( `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, ); return false; } } async function extractAndSaveImageText( jobId: string, asset: Buffer, bookmark: NonNullable>>, isFixMode: boolean, ): Promise { { const alreadyHasText = !!bookmark.asset.content; if (alreadyHasText && isFixMode) { logger.info( `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`, ); return false; } } let imageText = null; logger.info( `[assetPreprocessing][${jobId}] Attempting to extract text from image.`, ); try { imageText = await readImageText(asset); } catch (e) { logger.error( `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, ); } if (!imageText) { return false; } logger.info( `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, ); await db .update(bookmarkAssets) .set({ content: imageText, metadata: null, }) .where(eq(bookmarkAssets.id, bookmark.id)); return true; } async function extractAndSavePDFText( jobId: string, asset: Buffer, bookmark: NonNullable>>, isFixMode: boolean, ): Promise { { const alreadyHasText = !!bookmark.asset.content; if (alreadyHasText && isFixMode) { logger.info( `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`, ); return false; } } logger.info( `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`, ); const pdfParse = await readPDFText(asset); if (!pdfParse?.text) { throw new Error( `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, ); } logger.info( `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, ); await db .update(bookmarkAssets) .set({ content: pdfParse.text, metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, }) .where(eq(bookmarkAssets.id, bookmark.id)); return true; } async function getBookmark(bookmarkId: string) { return db.query.bookmarks.findFirst({ where: eq(bookmarks.id, bookmarkId), with: { asset: true, assets: true, }, }); } async function run(req: DequeuedJob) { const isFixMode = req.data.fixMode; const jobId = req.id; const bookmarkId = req.data.bookmarkId; const bookmark = await db.query.bookmarks.findFirst({ where: eq(bookmarks.id, bookmarkId), with: { asset: true, assets: true, }, }); logger.info( `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, ); if (!bookmark) { throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); } if (!bookmark.asset) { throw new Error( `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, ); } const { asset } = await readAsset({ userId: bookmark.userId, assetId: bookmark.asset.assetId, }); if (!asset) { throw new Error( `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, ); } let anythingChanged = false; switch (bookmark.asset.assetType) { case "image": { const extarctedText = await extractAndSaveImageText( jobId, asset, bookmark, isFixMode, ); anythingChanged ||= extarctedText; break; } case "pdf": { const extractedText = await extractAndSavePDFText( jobId, asset, bookmark, isFixMode, ); const extractedScreenshot = await extractAndSavePDFScreenshot( jobId, asset, bookmark, isFixMode, ); anythingChanged ||= extractedText || extractedScreenshot; break; } default: throw new Error( `[assetPreprocessing][${jobId}] Unsupported bookmark type`, ); } if (!isFixMode || anythingChanged) { await OpenAIQueue.enqueue({ bookmarkId, }); // Update the search index await triggerSearchReindex(bookmarkId); } }