From 9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 12 Jul 2025 13:06:46 +0000 Subject: fix: Prioritize crawling user added links over bulk imports. fixes #1717 --- apps/workers/package.json | 2 +- apps/workers/workers/assetPreprocessingWorker.ts | 19 +++++++---- apps/workers/workers/crawlerWorker.ts | 43 +++++++++++++++++------- apps/workers/workers/inference/summarize.ts | 4 ++- apps/workers/workers/inference/tagging.ts | 11 ++++-- 5 files changed, 55 insertions(+), 24 deletions(-) (limited to 'apps/workers') diff --git a/apps/workers/package.json b/apps/workers/package.json index 595a6e00..4f080169 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -17,7 +17,7 @@ "drizzle-orm": "^0.38.3", "execa": "9.3.1", "jsdom": "^24.0.0", - "liteque": "^0.3.2", + "liteque": "^0.4.1", "metascraper": "^5.46.18", "metascraper-amazon": "^5.46.18", "metascraper-author": "5.46.18", diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 8231ce50..c3ecd1e0 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -1,6 +1,6 @@ import os from "os"; import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; @@ -345,13 +345,20 @@ async function run(req: DequeuedJob) { ); } + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: req.priority, + }; if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - type: "tag", - }); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "tag", + }, + enqueueOpts, + ); // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); } } diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 428ec0f5..edd1d8f1 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -10,7 +10,7 @@ import { eq } from "drizzle-orm"; import { execa } from "execa"; import { isShuttingDown } from "exit"; import { JSDOM, VirtualConsole } from "jsdom"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import metascraper from "metascraper"; import metascraperAmazon from "metascraper-amazon"; import metascraperAuthor from "metascraper-author"; @@ -56,8 +56,8 @@ import { LinkCrawlerQueue, OpenAIQueue, triggerSearchReindex, - triggerVideoWorker, triggerWebhook, + VideoWorkerQueue, zCrawlLinkRequestSchema, } from "@karakeep/shared/queues"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -1034,26 +1034,43 @@ async function runCrawler(job: DequeuedJob) { job.abortSignal, ); + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: job.priority, + }; + // Enqueue openai job (if not set, assume it's true for backward compatibility) if (job.data.runInference !== false) { - await OpenAIQueue.enqueue({ - bookmarkId, - type: "tag", - }); - await OpenAIQueue.enqueue({ - bookmarkId, - type: "summarize", - }); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "tag", + }, + enqueueOpts, + ); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "summarize", + }, + enqueueOpts, + ); } // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); // Trigger a potential download of a video from the URL - await triggerVideoWorker(bookmarkId, url); + await VideoWorkerQueue.enqueue( + { + bookmarkId, + url, + }, + enqueueOpts, + ); // Trigger a webhook - await triggerWebhook(bookmarkId, "crawled"); + await triggerWebhook(bookmarkId, "crawled", undefined, enqueueOpts); // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 2d9c8ef7..c7b68faf 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -127,5 +127,7 @@ URL: ${link.url ?? ""} }) .where(eq(bookmarks.id, bookmarkId)); - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, { + priority: job.priority, + }); } diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 271eea4b..3c7b5adb 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,5 +1,5 @@ import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob } from "liteque"; +import { DequeuedJob, EnqueueOptions } from "liteque"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; @@ -434,9 +434,14 @@ export async function runTagging( await connectTags(bookmarkId, tags, bookmark.userId); + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: job.priority, + }; + // Trigger a webhook - await triggerWebhook(bookmarkId, "ai tagged"); + await triggerWebhook(bookmarkId, "ai tagged", undefined, enqueueOpts); // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); } -- cgit v1.2.3-70-g09d2