aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/assetPreprocessingWorker.ts197
-rw-r--r--apps/workers/crawlerWorker.ts35
-rw-r--r--apps/workers/index.ts23
-rw-r--r--apps/workers/openaiWorker.ts50
-rw-r--r--apps/workers/utils.ts44
5 files changed, 231 insertions, 118 deletions
diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/assetPreprocessingWorker.ts
new file mode 100644
index 00000000..5c4937e5
--- /dev/null
+++ b/apps/workers/assetPreprocessingWorker.ts
@@ -0,0 +1,197 @@
+import os from "os";
+import { eq } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+import PDFParser from "pdf2json";
+import { createWorker } from "tesseract.js";
+
+import type { AssetPreprocessingRequest } from "@hoarder/shared/queues";
+import { db } from "@hoarder/db";
+import { bookmarkAssets, bookmarks } from "@hoarder/db/schema";
+import { readAsset } from "@hoarder/shared/assetdb";
+import serverConfig from "@hoarder/shared/config";
+import logger from "@hoarder/shared/logger";
+import {
+ AssetPreprocessingQueue,
+ OpenAIQueue,
+ triggerSearchReindex,
+} from "@hoarder/shared/queues";
+
+export class AssetPreprocessingWorker {
+ static build() {
+ logger.info("Starting asset preprocessing worker ...");
+ const worker = new Runner<AssetPreprocessingRequest>(
+ AssetPreprocessingQueue,
+ {
+ run: run,
+ onComplete: async (job) => {
+ const jobId = job.id;
+ logger.info(`[assetPreprocessing][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: async (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[assetPreprocessing][${jobId}] Asset preprocessing failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 30,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function readImageText(buffer: Buffer) {
+ if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") {
+ return null;
+ }
+ const worker = await createWorker(serverConfig.ocr.langs, undefined, {
+ cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(),
+ });
+ try {
+ const ret = await worker.recognize(buffer);
+ if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) {
+ return null;
+ }
+ return ret.data.text;
+ } finally {
+ await worker.terminate();
+ }
+}
+
+async function readPDFText(buffer: Buffer): Promise<{
+ text: string;
+ metadata: Record<string, string>;
+}> {
+ return new Promise((resolve, reject) => {
+ // Need raw text flag represents as number (1), reference : https://github.com/modesty/pdf2json/issues/76#issuecomment-236569265
+ const pdfParser = new PDFParser(null, 1);
+ pdfParser.on("pdfParser_dataError", reject);
+ pdfParser.on("pdfParser_dataReady", (pdfData) => {
+ resolve({
+ // The type isn't set correctly, reference : https://github.com/modesty/pdf2json/issues/327
+ // eslint-disable-next-line
+ text: (pdfParser as any).getRawTextContent(),
+ metadata: pdfData.Meta,
+ });
+ });
+ pdfParser.parseBuffer(buffer);
+ });
+}
+
+async function preprocessImage(
+ jobId: string,
+ asset: Buffer,
+): Promise<{ content: string; metadata: string | null } | undefined> {
+ let imageText = null;
+ try {
+ imageText = await readImageText(asset);
+ } catch (e) {
+ logger.error(
+ `[assetPreprocessing][${jobId}] Failed to read image text: ${e}`,
+ );
+ }
+ if (!imageText) {
+ return undefined;
+ }
+
+ logger.info(
+ `[assetPreprocessing][${jobId}] Extracted ${imageText.length} characters from image.`,
+ );
+ return { content: imageText, metadata: null };
+}
+
+async function preProcessPDF(
+ jobId: string,
+ asset: Buffer,
+): Promise<{ content: string; metadata: string | null } | undefined> {
+ const pdfParse = await readPDFText(asset);
+ if (!pdfParse?.text) {
+ throw new Error(
+ `[assetPreprocessing][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`,
+ );
+ }
+ logger.info(
+ `[assetPreprocessing][${jobId}] Extracted ${pdfParse.text.length} characters from pdf.`,
+ );
+ return {
+ content: pdfParse.text,
+ metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null,
+ };
+}
+
+async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
+ const jobId = req.id;
+ const bookmarkId = req.data.bookmarkId;
+
+ const bookmark = await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, bookmarkId),
+ with: {
+ asset: true,
+ },
+ });
+
+ logger.info(
+ `[assetPreprocessing][${jobId}] Starting an asset preprocessing job for bookmark with id "${bookmarkId}"`,
+ );
+
+ if (!bookmark) {
+ throw new Error(`[assetPreprocessing][${jobId}] Bookmark not found`);
+ }
+
+ if (!bookmark.asset) {
+ throw new Error(
+ `[assetPreprocessing][${jobId}] Bookmark is not an asset (not an image or pdf)`,
+ );
+ }
+
+ const { asset } = await readAsset({
+ userId: bookmark.userId,
+ assetId: bookmark.asset.assetId,
+ });
+
+ if (!asset) {
+ throw new Error(
+ `[assetPreprocessing][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmarkId} not found`,
+ );
+ }
+
+ let result: { content: string; metadata: string | null } | undefined =
+ undefined;
+
+ switch (bookmark.asset.assetType) {
+ case "image":
+ result = await preprocessImage(jobId, asset);
+ break;
+ case "pdf":
+ result = await preProcessPDF(jobId, asset);
+ break;
+ default:
+ throw new Error(
+ `[assetPreprocessing][${jobId}] Unsupported bookmark type`,
+ );
+ }
+
+ if (result) {
+ await db
+ .update(bookmarkAssets)
+ .set({
+ content: result.content,
+ metadata: result.metadata,
+ })
+ .where(eq(bookmarkAssets.id, bookmarkId));
+ }
+
+ await OpenAIQueue.enqueue({
+ bookmarkId,
+ });
+
+ // Update the search index
+ await triggerSearchReindex(bookmarkId);
+}
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index 208de44b..252da3b2 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -49,6 +49,7 @@ import {
import serverConfig from "@hoarder/shared/config";
import logger from "@hoarder/shared/logger";
import {
+ AssetPreprocessingQueue,
LinkCrawlerQueue,
OpenAIQueue,
triggerSearchReindex,
@@ -568,6 +569,9 @@ async function handleAsAssetBookmark(
.where(eq(bookmarks.id, bookmarkId));
await trx.delete(bookmarkLinks).where(eq(bookmarkLinks.id, bookmarkId));
});
+ await AssetPreprocessingQueue.enqueue({
+ bookmarkId,
+ });
}
async function crawlAndParseUrl(
@@ -709,9 +713,6 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
// Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage
const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF;
- let archivalLogic: () => Promise<void> = () => {
- return Promise.resolve();
- };
if (isPdf) {
await handleAsAssetBookmark(url, "pdf", userId, jobId, bookmarkId);
} else if (
@@ -721,7 +722,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
) {
await handleAsAssetBookmark(url, "image", userId, jobId, bookmarkId);
} else {
- archivalLogic = await crawlAndParseUrl(
+ const archivalLogic = await crawlAndParseUrl(
url,
userId,
jobId,
@@ -731,21 +732,21 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
oldFullPageArchiveAssetId,
archiveFullPage,
);
- }
- // Enqueue openai job (if not set, assume it's true for backward compatibility)
- if (job.data.runInference !== false) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- });
- }
+ // Enqueue openai job (if not set, assume it's true for backward compatibility)
+ if (job.data.runInference !== false) {
+ await OpenAIQueue.enqueue({
+ bookmarkId,
+ });
+ }
- // Update the search index
- await triggerSearchReindex(bookmarkId);
+ // Update the search index
+ await triggerSearchReindex(bookmarkId);
- // Trigger a potential download of a video from the URL
- await triggerVideoWorker(bookmarkId, url);
+ // Trigger a potential download of a video from the URL
+ await triggerVideoWorker(bookmarkId, url);
- // Do the archival as a separate last step as it has the potential for failure
- await archivalLogic();
+ // Do the archival as a separate last step as it has the potential for failure
+ await archivalLogic();
+ }
}
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index e333ab0d..c2d3f28a 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -1,5 +1,6 @@
import "dotenv/config";
+import { AssetPreprocessingWorker } from "assetPreprocessingWorker";
import { FeedRefreshingWorker, FeedWorker } from "feedWorker";
import { TidyAssetsWorker } from "tidyAssetsWorker";
@@ -17,14 +18,16 @@ async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
runQueueDBMigrations();
- const [crawler, openai, search, tidyAssets, video, feed] = [
- await CrawlerWorker.build(),
- OpenAiWorker.build(),
- SearchIndexingWorker.build(),
- TidyAssetsWorker.build(),
- VideoWorker.build(),
- FeedWorker.build(),
- ];
+ const [crawler, openai, search, tidyAssets, video, feed, assetPreprocessing] =
+ [
+ await CrawlerWorker.build(),
+ OpenAiWorker.build(),
+ SearchIndexingWorker.build(),
+ TidyAssetsWorker.build(),
+ VideoWorker.build(),
+ FeedWorker.build(),
+ AssetPreprocessingWorker.build(),
+ ];
FeedRefreshingWorker.start();
await Promise.any([
@@ -35,11 +38,12 @@ async function main() {
tidyAssets.run(),
video.run(),
feed.run(),
+ assetPreprocessing.run(),
]),
shutdownPromise,
]);
logger.info(
- "Shutting down crawler, openai, tidyAssets, video, feed and search workers ...",
+ "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing and search workers ...",
);
FeedRefreshingWorker.stop();
@@ -49,6 +53,7 @@ async function main() {
tidyAssets.stop();
video.stop();
feed.stop();
+ assetPreprocessing.stop();
}
main();
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 0e3193b6..bad06bb3 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -7,7 +7,6 @@ import type { InferenceClient } from "@hoarder/shared/inference";
import type { ZOpenAIRequest } from "@hoarder/shared/queues";
import { db } from "@hoarder/db";
import {
- bookmarkAssets,
bookmarks,
bookmarkTags,
customPrompts,
@@ -24,8 +23,6 @@ import {
zOpenAIRequestSchema,
} from "@hoarder/shared/queues";
-import { readImageText, readPDFText } from "./utils";
-
const openAIResponseSchema = z.object({
tags: z.array(z.string()),
});
@@ -156,25 +153,6 @@ async function inferTagsFromImage(
);
}
- let imageText = null;
- try {
- imageText = await readImageText(asset);
- } catch (e) {
- logger.error(`[inference][${jobId}] Failed to read image text: ${e}`);
- }
-
- if (imageText) {
- logger.info(
- `[inference][${jobId}] Extracted ${imageText.length} characters from image.`,
- );
- await db
- .update(bookmarkAssets)
- .set({
- content: imageText,
- })
- .where(eq(bookmarkAssets.id, bookmark.id));
- }
-
const base64 = asset.toString("base64");
return inferenceClient.inferFromImage(
buildImagePrompt(
@@ -245,38 +223,14 @@ function containsTagsPlaceholder(prompts: { text: string }[]): boolean {
}
async function inferTagsFromPDF(
- jobId: string,
+ _jobId: string,
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
) {
- const { asset } = await readAsset({
- userId: bookmark.userId,
- assetId: bookmark.asset.assetId,
- });
- if (!asset) {
- throw new Error(
- `[inference][${jobId}] AssetId ${bookmark.asset.assetId} for bookmark ${bookmark.id} not found`,
- );
- }
- const pdfParse = await readPDFText(asset);
- if (!pdfParse?.text) {
- throw new Error(
- `[inference][${jobId}] PDF text is empty. Please make sure that the PDF includes text and not just images.`,
- );
- }
-
- await db
- .update(bookmarkAssets)
- .set({
- content: pdfParse.text,
- metadata: pdfParse.metadata ? JSON.stringify(pdfParse.metadata) : null,
- })
- .where(eq(bookmarkAssets.id, bookmark.id));
-
const prompt = buildTextPrompt(
serverConfig.inference.inferredTagLang,
await fetchCustomPrompts(bookmark.userId, "text"),
- `Content: ${pdfParse.text}`,
+ `Content: ${bookmark.asset.content}`,
serverConfig.inference.contextLength,
);
return inferenceClient.inferFromText(prompt, { json: true });
diff --git a/apps/workers/utils.ts b/apps/workers/utils.ts
index 15634902..2f56d3f0 100644
--- a/apps/workers/utils.ts
+++ b/apps/workers/utils.ts
@@ -1,9 +1,3 @@
-import os from "os";
-import PDFParser from "pdf2json";
-import { createWorker } from "tesseract.js";
-
-import serverConfig from "@hoarder/shared/config";
-
export function withTimeout<T, Ret>(
func: (param: T) => Promise<Ret>,
timeoutSec: number,
@@ -20,41 +14,3 @@ export function withTimeout<T, Ret>(
]);
};
}
-
-export async function readImageText(buffer: Buffer) {
- if (serverConfig.ocr.langs.length == 1 && serverConfig.ocr.langs[0] == "") {
- return null;
- }
- const worker = await createWorker(serverConfig.ocr.langs, undefined, {
- cachePath: serverConfig.ocr.cacheDir ?? os.tmpdir(),
- });
- try {
- const ret = await worker.recognize(buffer);
- if (ret.data.confidence <= serverConfig.ocr.confidenceThreshold) {
- return null;
- }
- return ret.data.text;
- } finally {
- await worker.terminate();
- }
-}
-
-export async function readPDFText(buffer: Buffer): Promise<{
- text: string;
- metadata: Record<string, string>;
-}> {
- return new Promise((resolve, reject) => {
- // Need raw text flag represents as number (1), reference : https://github.com/modesty/pdf2json/issues/76#issuecomment-236569265
- const pdfParser = new PDFParser(null, 1);
- pdfParser.on("pdfParser_dataError", reject);
- pdfParser.on("pdfParser_dataReady", (pdfData) => {
- resolve({
- // The type isn't set correctly, reference : https://github.com/modesty/pdf2json/issues/327
- // eslint-disable-next-line
- text: (pdfParser as any).getRawTextContent(),
- metadata: pdfData.Meta,
- });
- });
- pdfParser.parseBuffer(buffer);
- });
-}