diff options
| -rw-r--r-- | apps/workers/assetPreprocessingWorker.ts | 197 | ||||
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 35 | ||||
| -rw-r--r-- | apps/workers/index.ts | 23 | ||||
| -rw-r--r-- | apps/workers/openaiWorker.ts | 50 | ||||
| -rw-r--r-- | apps/workers/utils.ts | 44 | ||||
| -rw-r--r-- | packages/shared/queues.ts | 19 | ||||
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 10 |
7 files changed, 258 insertions, 120 deletions
diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/assetPreprocessingWorker.ts new file mode 100644 index 00000000..5c4937e5 --- /dev/null +++ b/apps/workers/assetPreprocessingWorker.ts @@ -0,0 +1,197 @@ +import os from "os"; +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import PDFParser from "pdf2json"; +import { createWorker } from "tesseract.js"; + +import type { AssetPreprocessingRequest } from "@hoarder/shared/queues"; +import { db } from "@hoarder/db"; +import { bookmarkAssets, bookmarks } from "@hoarder/db/schema"; +import { readAsset } from "@hoarder/shared/assetdb"; +import serverConfig from "@hoarder/shared/config"; +import logger from "@hoarder/shared/logger"; +import { + AssetPreprocessingQueue, + OpenAIQueue, + triggerSearchReindex, +} from "@hoarder/shared/queues"; + +export class AssetPreprocessingWorker { + static build() { + logger.info("Starting asset preprocessing worker ..."); + const worker = new Runner<AssetPreprocessingRequest>( + AssetPreprocessingQueue, + { + run: run, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[assetPreprocessing][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 30, + }, + ); + + return worker; + } +} + +async function readImageText(buffer: Buffer) { + if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { + return null; + } + const worker = await createWorker(serverConfig.ocr.langs, undefined, { + cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), + }); + try { + const ret = await worker.recognize(buffer); + if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { + return null; + } + return ret.data.text; + } finally { + await worker.terminate(); + } +} + +async function readPDFText(buffer: Buffer): Promise<{ + text: string; + metadata: Record<string, string>; +}> { + return new Promise((resolve, reject) => { + // Need raw text flag represents as number (1), reference : https://github.com/modesty/pdf2json/issues/76#issuecomment-236569265 + const pdfParser = new PDFParser(null, 1); + pdfParser.on("pdfParser_dataError", reject); + pdfParser.on("pdfParser_dataReady", (pdfData) => { + resolve({ + // The type isn't set correctly, reference : https://github.com/modesty/pdf2json/issues/327 + // eslint-disable-next-line + text: (pdfParser as any).getRawTextContent(), + metadata: pdfData.Meta, + }); + }); + pdfParser.parseBuffer(buffer); + }); +} + +async function preprocessImage( + jobId: string, + asset: Buffer, +): Promise<{ content: string; metadata: string | null } | undefined> { + let imageText = null; + try { + imageText = await readImageText(asset); + } catch (e) { + logger.error( + `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`, + ); + } + if (!imageText) { + return undefined; + } + + logger.info( + `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`, + ); + return { content: imageText, metadata: null }; +} + +async function preProcessPDF( + jobId: string, + asset: Buffer, +): Promise<{ content: string; metadata: string | null } | undefined> { + const pdfParse = await readPDFText(asset); + if (!pdfParse?.text) { + throw new Error( + `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, + ); + } + logger.info( + `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`, + ); + return { + content: pdfParse.text, + metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, + }; +} + +async function run(req: DequeuedJob<AssetPreprocessingRequest>) { + const jobId = req.id; + const bookmarkId = req.data.bookmarkId; + + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + with: { + asset: true, + }, + }); + + logger.info( + `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`, + ); + + if (!bookmark) { + throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`); + } + + if (!bookmark.asset) { + throw new Error( + `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`, + ); + } + + const { asset } = await readAsset({ + userId: bookmark.userId, + assetId: bookmark.asset.assetId, + }); + + if (!asset) { + throw new Error( + `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`, + ); + } + + let result: { content: string; metadata: string | null } | undefined = + undefined; + + switch (bookmark.asset.assetType) { + case "image": + result = await preprocessImage(jobId, asset); + break; + case "pdf": + result = await preProcessPDF(jobId, asset); + break; + default: + throw new Error( + `[assetPreprocessing][${jobId}] Unsupported bookmark type`, + ); + } + + if (result) { + await db + .update(bookmarkAssets) + .set({ + content: result.content, + metadata: result.metadata, + }) + .where(eq(bookmarkAssets.id, bookmarkId)); + } + + await OpenAIQueue.enqueue({ + bookmarkId, + }); + + // Update the search index + await triggerSearchReindex(bookmarkId); +} diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index 208de44b..252da3b2 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -49,6 +49,7 @@ import { import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; import { + AssetPreprocessingQueue, LinkCrawlerQueue, OpenAIQueue, triggerSearchReindex, @@ -568,6 +569,9 @@ async function handleAsAssetBookmark( .where(eq(bookmarks.id, bookmarkId)); await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId)); }); + await AssetPreprocessingQueue.enqueue({ + bookmarkId, + }); } async function crawlAndParseUrl( @@ -709,9 +713,6 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; - let archivalLogic: () => Promise<void> = () => { - return Promise.resolve(); - }; if (isPdf) { await handleAsAssetBookmark(url, "pdf", userId, jobId, bookmarkId); } else if ( @@ -721,7 +722,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { ) { await handleAsAssetBookmark(url, "image", userId, jobId, bookmarkId); } else { - archivalLogic = await crawlAndParseUrl( + const archivalLogic = await crawlAndParseUrl( url, userId, jobId, @@ -731,21 +732,21 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { oldFullPageArchiveAssetId, archiveFullPage, ); - } - // Enqueue openai job (if not set, assume it's true for backward compatibility) - if (job.data.runInference !== false) { - await OpenAIQueue.enqueue({ - bookmarkId, - }); - } + // Enqueue openai job (if not set, assume it's true for backward compatibility) + if (job.data.runInference !== false) { + await OpenAIQueue.enqueue({ + bookmarkId, + }); + } - // Update the search index - await triggerSearchReindex(bookmarkId); + // Update the search index + await triggerSearchReindex(bookmarkId); - // Trigger a potential download of a video from the URL - await triggerVideoWorker(bookmarkId, url); + // Trigger a potential download of a video from the URL + await triggerVideoWorker(bookmarkId, url); - // Do the archival as a separate last step as it has the potential for failure - await archivalLogic(); + // Do the archival as a separate last step as it has the potential for failure + await archivalLogic(); + } } diff --git a/apps/workers/index.ts b/apps/workers/index.ts index e333ab0d..c2d3f28a 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,5 +1,6 @@ import "dotenv/config"; +import { AssetPreprocessingWorker } from "assetPreprocessingWorker"; import { FeedRefreshingWorker, FeedWorker } from "feedWorker"; import { TidyAssetsWorker } from "tidyAssetsWorker"; @@ -17,14 +18,16 @@ async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); runQueueDBMigrations(); - const [crawler, openai, search, tidyAssets, video, feed] = [ - await CrawlerWorker.build(), - OpenAiWorker.build(), - SearchIndexingWorker.build(), - TidyAssetsWorker.build(), - VideoWorker.build(), - FeedWorker.build(), - ]; + const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] = + [ + await CrawlerWorker.build(), + OpenAiWorker.build(), + SearchIndexingWorker.build(), + TidyAssetsWorker.build(), + VideoWorker.build(), + FeedWorker.build(), + AssetPreprocessingWorker.build(), + ]; FeedRefreshingWorker.start(); await Promise.any([ @@ -35,11 +38,12 @@ async function main() { tidyAssets.run(), video.run(), feed.run(), + assetPreprocessing.run(), ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed and search workers ...", + "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...", ); FeedRefreshingWorker.stop(); @@ -49,6 +53,7 @@ async function main() { tidyAssets.stop(); video.stop(); feed.stop(); + assetPreprocessing.stop(); } main(); diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 0e3193b6..bad06bb3 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -7,7 +7,6 @@ import type { InferenceClient } from "@hoarder/shared/inference"; import type { ZOpenAIRequest } from "@hoarder/shared/queues"; import { db } from "@hoarder/db"; import { - bookmarkAssets, bookmarks, bookmarkTags, customPrompts, @@ -24,8 +23,6 @@ import { zOpenAIRequestSchema, } from "@hoarder/shared/queues"; -import { readImageText, readPDFText } from "./utils"; - const openAIResponseSchema = z.object({ tags: z.array(z.string()), }); @@ -156,25 +153,6 @@ async function inferTagsFromImage( ); } - let imageText = null; - try { - imageText = await readImageText(asset); - } catch (e) { - logger.error(`[inference][${jobId}] Failed to read image text: ${e}`); - } - - if (imageText) { - logger.info( - `[inference][${jobId}] Extracted ${imageText.length} characters from image.`, - ); - await db - .update(bookmarkAssets) - .set({ - content: imageText, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - } - const base64 = asset.toString("base64"); return inferenceClient.inferFromImage( buildImagePrompt( @@ -245,38 +223,14 @@ function containsTagsPlaceholder(prompts: { text: string }[]): boolean { } async function inferTagsFromPDF( - jobId: string, + _jobId: string, bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, ) { - const { asset } = await readAsset({ - userId: bookmark.userId, - assetId: bookmark.asset.assetId, - }); - if (!asset) { - throw new Error( - `[inference][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmark.id} not found`, - ); - } - const pdfParse = await readPDFText(asset); - if (!pdfParse?.text) { - throw new Error( - `[inference][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`, - ); - } - - await db - .update(bookmarkAssets) - .set({ - content: pdfParse.text, - metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null, - }) - .where(eq(bookmarkAssets.id, bookmark.id)); - const prompt = buildTextPrompt( serverConfig.inference.inferredTagLang, await fetchCustomPrompts(bookmark.userId, "text"), - `Content: ${pdfParse.text}`, + `Content: ${bookmark.asset.content}`, serverConfig.inference.contextLength, ); return inferenceClient.inferFromText(prompt, { json: true }); diff --git a/apps/workers/utils.ts b/apps/workers/utils.ts index 15634902..2f56d3f0 100644 --- a/apps/workers/utils.ts +++ b/apps/workers/utils.ts @@ -1,9 +1,3 @@ -import os from "os"; -import PDFParser from "pdf2json"; -import { createWorker } from "tesseract.js"; - -import serverConfig from "@hoarder/shared/config"; - export function withTimeout<T, Ret>( func: (param: T) => Promise<Ret>, timeoutSec: number, @@ -20,41 +14,3 @@ export function withTimeout<T, Ret>( ]); }; } - -export async function readImageText(buffer: Buffer) { - if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") { - return null; - } - const worker = await createWorker(serverConfig.ocr.langs, undefined, { - cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(), - }); - try { - const ret = await worker.recognize(buffer); - if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) { - return null; - } - return ret.data.text; - } finally { - await worker.terminate(); - } -} - -export async function readPDFText(buffer: Buffer): Promise<{ - text: string; - metadata: Record<string, string>; -}> { - return new Promise((resolve, reject) => { - // Need raw text flag represents as number (1), reference : https://github.com/modesty/pdf2json/issues/76#issuecomment-236569265 - const pdfParser = new PDFParser(null, 1); - pdfParser.on("pdfParser_dataError", reject); - pdfParser.on("pdfParser_dataReady", (pdfData) => { - resolve({ - // The type isn't set correctly, reference : https://github.com/modesty/pdf2json/issues/327 - // eslint-disable-next-line - text: (pdfParser as any).getRawTextContent(), - metadata: pdfData.Meta, - }); - }); - pdfParser.parseBuffer(buffer); - }); -} diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index a887417d..7afb8774 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -139,3 +139,22 @@ export const FeedQueue = new SqliteQueue<ZFeedRequestSchema>( keepFailedJobs: false, }, ); + +// Preprocess Assets +export const zAssetPreprocessingRequestSchema = z.object({ + bookmarkId: z.string(), +}); +export type AssetPreprocessingRequest = z.infer< + typeof zAssetPreprocessingRequestSchema +>; +export const AssetPreprocessingQueue = + new SqliteQueue<AssetPreprocessingRequest>( + "asset_preprocessing_queue", + queueDB, + { + defaultJobArgs: { + numRetries: 2, + }, + keepFailedJobs: false, + }, + ); diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 8a4170cd..254ac6c2 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -26,6 +26,7 @@ import serverConfig from "@hoarder/shared/config"; import { InferenceClientFactory } from "@hoarder/shared/inference"; import { buildSummaryPrompt } from "@hoarder/shared/prompts"; import { + AssetPreprocessingQueue, LinkCrawlerQueue, OpenAIQueue, triggerSearchDeletion, @@ -378,13 +379,18 @@ export const bookmarksAppRouter = router({ }); break; } - case BookmarkTypes.TEXT: - case BookmarkTypes.ASSET: { + case BookmarkTypes.TEXT: { await OpenAIQueue.enqueue({ bookmarkId: bookmark.id, }); break; } + case BookmarkTypes.ASSET: { + await AssetPreprocessingQueue.enqueue({ + bookmarkId: bookmark.id, + }); + break; + } } await triggerSearchReindex(bookmark.id); return bookmark; |
