aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-07-12 13:06:46 +0000
committerMohamed Bassem <me@mbassem.com>2025-07-12 13:06:46 +0000
commit9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f (patch)
tree099f263c00e55d97d3d4b1464fdd116d917f70c0 /apps/workers
parent8e3013ba96532cab61eb6e5fae2ce30be5e94a57 (diff)
downloadkarakeep-9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f.tar.zst
fix: Prioritize crawling user added links over bulk imports. fixes #1717
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts19
-rw-r--r--apps/workers/workers/crawlerWorker.ts43
-rw-r--r--apps/workers/workers/inference/summarize.ts4
-rw-r--r--apps/workers/workers/inference/tagging.ts11
5 files changed, 55 insertions, 24 deletions
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 595a6e00..4f080169 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -17,7 +17,7 @@
"drizzle-orm": "^0.38.3",
"execa": "9.3.1",
"jsdom": "^24.0.0",
- "liteque": "^0.3.2",
+ "liteque": "^0.4.1",
"metascraper": "^5.46.18",
"metascraper-amazon": "^5.46.18",
"metascraper-author": "5.46.18",
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index 8231ce50..c3ecd1e0 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -1,6 +1,6 @@
import os from "os";
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
@@ -345,13 +345,20 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
);
}
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: req.priority,
+ };
if (!isFixMode || anythingChanged) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "tag",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
}
}
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 428ec0f5..edd1d8f1 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -10,7 +10,7 @@ import { eq } from "drizzle-orm";
import { execa } from "execa";
import { isShuttingDown } from "exit";
import { JSDOM, VirtualConsole } from "jsdom";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import metascraper from "metascraper";
import metascraperAmazon from "metascraper-amazon";
import metascraperAuthor from "metascraper-author";
@@ -56,8 +56,8 @@ import {
LinkCrawlerQueue,
OpenAIQueue,
triggerSearchReindex,
- triggerVideoWorker,
triggerWebhook,
+ VideoWorkerQueue,
zCrawlLinkRequestSchema,
} from "@karakeep/shared/queues";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -1034,26 +1034,43 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
job.abortSignal,
);
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: job.priority,
+ };
+
// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "tag",
- });
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "summarize",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "summarize",
+ },
+ enqueueOpts,
+ );
}
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
// Trigger a potential download of a video from the URL
- await triggerVideoWorker(bookmarkId, url);
+ await VideoWorkerQueue.enqueue(
+ {
+ bookmarkId,
+ url,
+ },
+ enqueueOpts,
+ );
// Trigger a webhook
- await triggerWebhook(bookmarkId, "crawled");
+ await triggerWebhook(bookmarkId, "crawled", undefined, enqueueOpts);
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 2d9c8ef7..c7b68faf 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -127,5 +127,7 @@ URL: ${link.url ?? ""}
})
.where(eq(bookmarks.id, bookmarkId));
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, {
+ priority: job.priority,
+ });
}
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 271eea4b..3c7b5adb 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,5 +1,5 @@
import { and, Column, eq, inArray, sql } from "drizzle-orm";
-import { DequeuedJob } from "liteque";
+import { DequeuedJob, EnqueueOptions } from "liteque";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
@@ -434,9 +434,14 @@ export async function runTagging(
await connectTags(bookmarkId, tags, bookmark.userId);
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: job.priority,
+ };
+
// Trigger a webhook
- await triggerWebhook(bookmarkId, "ai tagged");
+ await triggerWebhook(bookmarkId, "ai tagged", undefined, enqueueOpts);
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
}