aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
Diffstat (limited to 'apps')
-rw-r--r--apps/web/components/settings/ImportExport.tsx2
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts19
-rw-r--r--apps/workers/workers/crawlerWorker.ts43
-rw-r--r--apps/workers/workers/inference/summarize.ts4
-rw-r--r--apps/workers/workers/inference/tagging.ts11
6 files changed, 57 insertions, 24 deletions
diff --git a/apps/web/components/settings/ImportExport.tsx b/apps/web/components/settings/ImportExport.tsx
index e859f8c2..94703876 100644
--- a/apps/web/components/settings/ImportExport.tsx
+++ b/apps/web/components/settings/ImportExport.tsx
@@ -131,6 +131,8 @@ export function ImportExportRow() {
throw new Error("Content is undefined");
}
const created = await createBookmark({
+ // This is important to avoid blocking the crawling of more important bookmarks
+ crawlPriority: "low",
title: bookmark.title,
createdAt: bookmark.addDate
? new Date(bookmark.addDate * 1000)
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 595a6e00..4f080169 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -17,7 +17,7 @@
"drizzle-orm": "^0.38.3",
"execa": "9.3.1",
"jsdom": "^24.0.0",
- "liteque": "^0.3.2",
+ "liteque": "^0.4.1",
"metascraper": "^5.46.18",
"metascraper-amazon": "^5.46.18",
"metascraper-author": "5.46.18",
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index 8231ce50..c3ecd1e0 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -1,6 +1,6 @@
import os from "os";
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
@@ -345,13 +345,20 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
);
}
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: req.priority,
+ };
if (!isFixMode || anythingChanged) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "tag",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
}
}
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 428ec0f5..edd1d8f1 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -10,7 +10,7 @@ import { eq } from "drizzle-orm";
import { execa } from "execa";
import { isShuttingDown } from "exit";
import { JSDOM, VirtualConsole } from "jsdom";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import metascraper from "metascraper";
import metascraperAmazon from "metascraper-amazon";
import metascraperAuthor from "metascraper-author";
@@ -56,8 +56,8 @@ import {
LinkCrawlerQueue,
OpenAIQueue,
triggerSearchReindex,
- triggerVideoWorker,
triggerWebhook,
+ VideoWorkerQueue,
zCrawlLinkRequestSchema,
} from "@karakeep/shared/queues";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -1034,26 +1034,43 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
job.abortSignal,
);
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: job.priority,
+ };
+
// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "tag",
- });
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "summarize",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "summarize",
+ },
+ enqueueOpts,
+ );
}
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
// Trigger a potential download of a video from the URL
- await triggerVideoWorker(bookmarkId, url);
+ await VideoWorkerQueue.enqueue(
+ {
+ bookmarkId,
+ url,
+ },
+ enqueueOpts,
+ );
// Trigger a webhook
- await triggerWebhook(bookmarkId, "crawled");
+ await triggerWebhook(bookmarkId, "crawled", undefined, enqueueOpts);
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 2d9c8ef7..c7b68faf 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -127,5 +127,7 @@ URL: ${link.url ?? ""}
})
.where(eq(bookmarks.id, bookmarkId));
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, {
+ priority: job.priority,
+ });
}
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 271eea4b..3c7b5adb 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,5 +1,5 @@
import { and, Column, eq, inArray, sql } from "drizzle-orm";
-import { DequeuedJob } from "liteque";
+import { DequeuedJob, EnqueueOptions } from "liteque";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
@@ -434,9 +434,14 @@ export async function runTagging(
await connectTags(bookmarkId, tags, bookmark.userId);
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: job.priority,
+ };
+
// Trigger a webhook
- await triggerWebhook(bookmarkId, "ai tagged");
+ await triggerWebhook(bookmarkId, "ai tagged", undefined, enqueueOpts);
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
}