aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/assetPreprocessingWorker.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-05-18 20:22:59 +0000
committerMohamed Bassem <me@mbassem.com>2025-05-18 20:22:59 +0000
commit2743d9e38ecfdbf757d4d2f97bcf09d601245b59 (patch)
tree3119581aafce5321aaba9719ba3b2597d000d564 /apps/workers/assetPreprocessingWorker.ts
parenta5ae67c241d8cdd452acd4d98800ec61740c041f (diff)
downloadkarakeep-2743d9e38ecfdbf757d4d2f97bcf09d601245b59.tar.zst
feat: Add AI auto summarization. Fixes #1163
Diffstat (limited to 'apps/workers/assetPreprocessingWorker.ts')
-rw-r--r--apps/workers/assetPreprocessingWorker.ts338
1 files changed, 0 insertions, 338 deletions
diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/assetPreprocessingWorker.ts
deleted file mode 100644
index a678b706..00000000
--- a/apps/workers/assetPreprocessingWorker.ts
+++ /dev/null
@@ -1,338 +0,0 @@
-import os from "os";
-import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
-import PDFParser from "pdf2json";
-import { fromBuffer } from "pdf2pic";
-import { createWorker } from "tesseract.js";
-
-import type { AssetPreprocessingRequest } from "@karakeep/shared/queues";
-import { db } from "@karakeep/db";
-import {
- assets,
- AssetTypes,
- bookmarkAssets,
- bookmarks,
-} from "@karakeep/db/schema";
-import { newAssetId, readAsset, saveAsset } from "@karakeep/shared/assetdb";
-import serverConfig from "@karakeep/shared/config";
-import logger from "@karakeep/shared/logger";
-import {
- AssetPreprocessingQueue,
- OpenAIQueue,
- triggerSearchReindex,
-} from "@karakeep/shared/queues";
-
-export class AssetPreprocessingWorker {
- static build() {
- logger.info("Starting asset preprocessing worker ...");
- const worker = new Runner<AssetPreprocessingRequest>(
- AssetPreprocessingQueue,
- {
- run: run,
- onComplete: async (job) => {
- const jobId = job.id;
- logger.info(`[assetPreprocessing][${jobId}] Completed successfully`);
- return Promise.resolve();
- },
- onError: async (job) => {
- const jobId = job.id;
- logger.error(
- `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
- );
- return Promise.resolve();
- },
- },
- {
- concurrency: 1,
- pollIntervalMs: 1000,
- timeoutSecs: 30,
- },
- );
-
- return worker;
- }
-}
-
-async function readImageText(buffer: Buffer) {
- if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") {
- return null;
- }
- const worker = await createWorker(serverConfig.ocr.langs, undefined, {
- cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(),
- });
- try {
- const ret = await worker.recognize(buffer);
- if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) {
- return null;
- }
- return ret.data.text;
- } finally {
- await worker.terminate();
- }
-}
-
-async function readPDFText(buffer: Buffer): Promise<{
- text: string;
- metadata: Record<string, object>;
-}> {
- return new Promise((resolve, reject) => {
- const pdfParser = new PDFParser(null, true);
- pdfParser.on("pdfParser_dataError", reject);
- pdfParser.on("pdfParser_dataReady", (pdfData) => {
- resolve({
- text: pdfParser.getRawTextContent(),
- metadata: pdfData.Meta,
- });
- });
- pdfParser.parseBuffer(buffer);
- });
-}
-
-export async function extractAndSavePDFScreenshot(
- jobId: string,
- asset: Buffer,
- bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
- isFixMode: boolean,
-): Promise<boolean> {
- {
- const alreadyHasScreenshot =
- bookmark.assets.find(
- (r) => r.assetType === AssetTypes.ASSET_SCREENSHOT,
- ) !== undefined;
- if (alreadyHasScreenshot && isFixMode) {
- logger.info(
- `[assetPreprocessing][${jobId}] Skipping PDF screenshot generation as it's already been generated.`,
- );
- return false;
- }
- }
- logger.info(
- `[assetPreprocessing][${jobId}] Attempting to generate PDF screenshot for bookmarkId: ${bookmark.id}`,
- );
- try {
- /**
- * If you encountered any issues with this library, make sure you have ghostscript and graphicsmagick installed following this URL
- * https://github.com/yakovmeister/pdf2image/blob/HEAD/docs/gm-installation.md
- */
- const screenshot = await fromBuffer(asset, {
- density: 100,
- quality: 100,
- format: "png",
- preserveAspectRatio: true,
- })(1, { responseType: "buffer" });
-
- if (!screenshot.buffer) {
- logger.error(
- `[assetPreprocessing][${jobId}] Failed to generate PDF screenshot`,
- );
- return false;
- }
-
- // Store the screenshot
- const assetId = newAssetId();
- const fileName = "screenshot.png";
- const contentType = "image/png";
- await saveAsset({
- userId: bookmark.userId,
- assetId,
- asset: screenshot.buffer,
- metadata: {
- contentType,
- fileName,
- },
- });
-
- // Insert into database
- await db.insert(assets).values({
- id: assetId,
- bookmarkId: bookmark.id,
- userId: bookmark.userId,
- assetType: AssetTypes.ASSET_SCREENSHOT,
- contentType,
- size: screenshot.buffer.byteLength,
- fileName,
- });
-
- logger.info(
- `[assetPreprocessing][${jobId}] Successfully saved PDF screenshot to database`,
- );
- return true;
- } catch (error) {
- logger.error(
- `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`,
- );
- return false;
- }
-}
-
-async function extractAndSaveImageText(
- jobId: string,
- asset: Buffer,
- bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
- isFixMode: boolean,
-): Promise<boolean> {
- {
- const alreadyHasText = !!bookmark.asset.content;
- if (alreadyHasText && isFixMode) {
- logger.info(
- `[assetPreprocessing][${jobId}] Skipping image text extraction as it's already been extracted.`,
- );
- return false;
- }
- }
- let imageText = null;
- logger.info(
- `[assetPreprocessing][${jobId}] Attempting to extract text from image.`,
- );
- try {
- imageText = await readImageText(asset);
- } catch (e) {
- logger.error(
- `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`,
- );
- }
- if (!imageText) {
- return false;
- }
-
- logger.info(
- `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`,
- );
- await db
- .update(bookmarkAssets)
- .set({
- content: imageText,
- metadata: null,
- })
- .where(eq(bookmarkAssets.id, bookmark.id));
- return true;
-}
-
-async function extractAndSavePDFText(
- jobId: string,
- asset: Buffer,
- bookmark: NonNullable<Awaited<ReturnType<typeof getBookmark>>>,
- isFixMode: boolean,
-): Promise<boolean> {
- {
- const alreadyHasText = !!bookmark.asset.content;
- if (alreadyHasText && isFixMode) {
- logger.info(
- `[assetPreprocessing][${jobId}] Skipping PDF text extraction as it's already been extracted.`,
- );
- return false;
- }
- }
- logger.info(
- `[assetPreprocessing][${jobId}] Attempting to extract text from pdf.`,
- );
- const pdfParse = await readPDFText(asset);
- if (!pdfParse?.text) {
- throw new Error(
- `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`,
- );
- }
- logger.info(
- `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`,
- );
- await db
- .update(bookmarkAssets)
- .set({
- content: pdfParse.text,
- metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null,
- })
- .where(eq(bookmarkAssets.id, bookmark.id));
- return true;
-}
-
-async function getBookmark(bookmarkId: string) {
- return db.query.bookmarks.findFirst({
- where: eq(bookmarks.id, bookmarkId),
- with: {
- asset: true,
- assets: true,
- },
- });
-}
-
-async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
- const isFixMode = req.data.fixMode;
- const jobId = req.id;
- const bookmarkId = req.data.bookmarkId;
-
- const bookmark = await db.query.bookmarks.findFirst({
- where: eq(bookmarks.id, bookmarkId),
- with: {
- asset: true,
- assets: true,
- },
- });
-
- logger.info(
- `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`,
- );
-
- if (!bookmark) {
- throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`);
- }
-
- if (!bookmark.asset) {
- throw new Error(
- `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`,
- );
- }
-
- const { asset } = await readAsset({
- userId: bookmark.userId,
- assetId: bookmark.asset.assetId,
- });
-
- if (!asset) {
- throw new Error(
- `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`,
- );
- }
-
- let anythingChanged = false;
- switch (bookmark.asset.assetType) {
- case "image": {
- const extractedText = await extractAndSaveImageText(
- jobId,
- asset,
- bookmark,
- isFixMode,
- );
- anythingChanged ||= extractedText;
- break;
- }
- case "pdf": {
- const extractedText = await extractAndSavePDFText(
- jobId,
- asset,
- bookmark,
- isFixMode,
- );
- const extractedScreenshot = await extractAndSavePDFScreenshot(
- jobId,
- asset,
- bookmark,
- isFixMode,
- );
- anythingChanged ||= extractedText || extractedScreenshot;
- break;
- }
- default:
- throw new Error(
- `[assetPreprocessing][${jobId}] Unsupported bookmark type`,
- );
- }
-
- if (!isFixMode || anythingChanged) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- });
-
- // Update the search index
- await triggerSearchReindex(bookmarkId);
- }
-}