diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-07-12 13:06:46 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-07-12 13:06:46 +0000 |
| commit | 9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f (patch) | |
| tree | 099f263c00e55d97d3d4b1464fdd116d917f70c0 /apps/workers | |
| parent | 8e3013ba96532cab61eb6e5fae2ce30be5e94a57 (diff) | |
| download | karakeep-9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f.tar.zst | |
fix: Prioritize crawling user added links over bulk imports. fixes #1717
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts | 19 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 43 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 4 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts | 11 |
5 files changed, 55 insertions, 24 deletions
diff --git a/apps/workers/package.json b/apps/workers/package.json index 595a6e00..4f080169 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -17,7 +17,7 @@ "drizzle-orm": "^0.38.3", "execa": "9.3.1", "jsdom": "^24.0.0", - "liteque": "^0.3.2", + "liteque": "^0.4.1", "metascraper": "^5.46.18", "metascraper-amazon": "^5.46.18", "metascraper-author": "5.46.18", diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 8231ce50..c3ecd1e0 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -1,6 +1,6 @@ import os from "os"; import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; @@ -345,13 +345,20 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { ); } + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: req.priority, + }; if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - type: "tag", - }); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "tag", + }, + enqueueOpts, + ); // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); } } diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 428ec0f5..edd1d8f1 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -10,7 +10,7 @@ import { eq } from "drizzle-orm"; import { execa } from "execa"; import { isShuttingDown } from "exit"; import { JSDOM, VirtualConsole } from "jsdom"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import metascraper from "metascraper"; import metascraperAmazon from "metascraper-amazon"; import metascraperAuthor from "metascraper-author"; @@ -56,8 +56,8 @@ import { LinkCrawlerQueue, OpenAIQueue, triggerSearchReindex, - triggerVideoWorker, triggerWebhook, + VideoWorkerQueue, zCrawlLinkRequestSchema, } from "@karakeep/shared/queues"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -1034,26 +1034,43 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { job.abortSignal, ); + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: job.priority, + }; + // Enqueue openai job (if not set, assume it's true for backward compatibility) if (job.data.runInference !== false) { - await OpenAIQueue.enqueue({ - bookmarkId, - type: "tag", - }); - await OpenAIQueue.enqueue({ - bookmarkId, - type: "summarize", - }); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "tag", + }, + enqueueOpts, + ); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "summarize", + }, + enqueueOpts, + ); } // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); // Trigger a potential download of a video from the URL - await triggerVideoWorker(bookmarkId, url); + await VideoWorkerQueue.enqueue( + { + bookmarkId, + url, + }, + enqueueOpts, + ); // Trigger a webhook - await triggerWebhook(bookmarkId, "crawled"); + await triggerWebhook(bookmarkId, "crawled", undefined, enqueueOpts); // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 2d9c8ef7..c7b68faf 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -127,5 +127,7 @@ URL: ${link.url ?? ""} }) .where(eq(bookmarks.id, bookmarkId)); - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, { + priority: job.priority, + }); } diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 271eea4b..3c7b5adb 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,5 +1,5 @@ import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob } from "liteque"; +import { DequeuedJob, EnqueueOptions } from "liteque"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; @@ -434,9 +434,14 @@ export async function runTagging( await connectTags(bookmarkId, tags, bookmark.userId); + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: job.priority, + }; + // Trigger a webhook - await triggerWebhook(bookmarkId, "ai tagged"); + await triggerWebhook(bookmarkId, "ai tagged", undefined, enqueueOpts); // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); } |
