aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--apps/web/components/settings/ImportExport.tsx2
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts19
-rw-r--r--apps/workers/workers/crawlerWorker.ts43
-rw-r--r--apps/workers/workers/inference/summarize.ts4
-rw-r--r--apps/workers/workers/inference/tagging.ts11
-rw-r--r--packages/open-api/karakeep-openapi-spec.json7
-rw-r--r--packages/shared/package.json2
-rw-r--r--packages/shared/queues.ts65
-rw-r--r--packages/shared/types/bookmarks.ts3
-rw-r--r--packages/trpc/routers/admin.ts10
-rw-r--r--packages/trpc/routers/bookmarks.ts67
-rw-r--r--packages/trpc/testUtils.ts1
-rw-r--r--pnpm-lock.yaml14
14 files changed, 159 insertions, 91 deletions
diff --git a/apps/web/components/settings/ImportExport.tsx b/apps/web/components/settings/ImportExport.tsx
index e859f8c2..94703876 100644
--- a/apps/web/components/settings/ImportExport.tsx
+++ b/apps/web/components/settings/ImportExport.tsx
@@ -131,6 +131,8 @@ export function ImportExportRow() {
throw new Error("Content is undefined");
}
const created = await createBookmark({
+ // This is important to avoid blocking the crawling of more important bookmarks
+ crawlPriority: "low",
title: bookmark.title,
createdAt: bookmark.addDate
? new Date(bookmark.addDate * 1000)
diff --git a/apps/workers/package.json b/apps/workers/package.json
index 595a6e00..4f080169 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -17,7 +17,7 @@
"drizzle-orm": "^0.38.3",
"execa": "9.3.1",
"jsdom": "^24.0.0",
- "liteque": "^0.3.2",
+ "liteque": "^0.4.1",
"metascraper": "^5.46.18",
"metascraper-amazon": "^5.46.18",
"metascraper-author": "5.46.18",
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index 8231ce50..c3ecd1e0 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -1,6 +1,6 @@
import os from "os";
import { eq } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import PDFParser from "pdf2json";
import { fromBuffer } from "pdf2pic";
import { createWorker } from "tesseract.js";
@@ -345,13 +345,20 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
);
}
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: req.priority,
+ };
if (!isFixMode || anythingChanged) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "tag",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
}
}
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 428ec0f5..edd1d8f1 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -10,7 +10,7 @@ import { eq } from "drizzle-orm";
import { execa } from "execa";
import { isShuttingDown } from "exit";
import { JSDOM, VirtualConsole } from "jsdom";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob, EnqueueOptions, Runner } from "liteque";
import metascraper from "metascraper";
import metascraperAmazon from "metascraper-amazon";
import metascraperAuthor from "metascraper-author";
@@ -56,8 +56,8 @@ import {
LinkCrawlerQueue,
OpenAIQueue,
triggerSearchReindex,
- triggerVideoWorker,
triggerWebhook,
+ VideoWorkerQueue,
zCrawlLinkRequestSchema,
} from "@karakeep/shared/queues";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
@@ -1034,26 +1034,43 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
job.abortSignal,
);
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: job.priority,
+ };
+
// Enqueue openai job (if not set, assume it's true for backward compatibility)
if (job.data.runInference !== false) {
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "tag",
- });
- await OpenAIQueue.enqueue({
- bookmarkId,
- type: "summarize",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId,
+ type: "summarize",
+ },
+ enqueueOpts,
+ );
}
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
// Trigger a potential download of a video from the URL
- await triggerVideoWorker(bookmarkId, url);
+ await VideoWorkerQueue.enqueue(
+ {
+ bookmarkId,
+ url,
+ },
+ enqueueOpts,
+ );
// Trigger a webhook
- await triggerWebhook(bookmarkId, "crawled");
+ await triggerWebhook(bookmarkId, "crawled", undefined, enqueueOpts);
// Do the archival as a separate last step as it has the potential for failure
await archivalLogic();
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
index 2d9c8ef7..c7b68faf 100644
--- a/apps/workers/workers/inference/summarize.ts
+++ b/apps/workers/workers/inference/summarize.ts
@@ -127,5 +127,7 @@ URL: ${link.url ?? ""}
})
.where(eq(bookmarks.id, bookmarkId));
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, {
+ priority: job.priority,
+ });
}
diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts
index 271eea4b..3c7b5adb 100644
--- a/apps/workers/workers/inference/tagging.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,5 +1,5 @@
import { and, Column, eq, inArray, sql } from "drizzle-orm";
-import { DequeuedJob } from "liteque";
+import { DequeuedJob, EnqueueOptions } from "liteque";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
@@ -434,9 +434,14 @@ export async function runTagging(
await connectTags(bookmarkId, tags, bookmark.userId);
+ // Propagate priority to child jobs
+ const enqueueOpts: EnqueueOptions = {
+ priority: job.priority,
+ };
+
// Trigger a webhook
- await triggerWebhook(bookmarkId, "ai tagged");
+ await triggerWebhook(bookmarkId, "ai tagged", undefined, enqueueOpts);
// Update the search index
- await triggerSearchReindex(bookmarkId);
+ await triggerSearchReindex(bookmarkId, enqueueOpts);
}
diff --git a/packages/open-api/karakeep-openapi-spec.json b/packages/open-api/karakeep-openapi-spec.json
index db1e09d0..ac74abbf 100644
--- a/packages/open-api/karakeep-openapi-spec.json
+++ b/packages/open-api/karakeep-openapi-spec.json
@@ -686,6 +686,13 @@
"createdAt": {
"type": "string",
"nullable": true
+ },
+ "crawlPriority": {
+ "type": "string",
+ "enum": [
+ "low",
+ "normal"
+ ]
}
}
},
diff --git a/packages/shared/package.json b/packages/shared/package.json
index 0210e24f..fbdd6651 100644
--- a/packages/shared/package.json
+++ b/packages/shared/package.json
@@ -9,7 +9,7 @@
"glob": "^11.0.0",
"html-to-text": "^9.0.5",
"js-tiktoken": "^1.0.20",
- "liteque": "^0.3.2",
+ "liteque": "^0.4.1",
"meilisearch": "^0.37.0",
"nodemailer": "^7.0.4",
"ollama": "^0.5.14",
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
index a2fdc6b4..6cc5dd83 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared/queues.ts
@@ -1,5 +1,5 @@
import path from "node:path";
-import { buildDBClient, migrateDB, SqliteQueue } from "liteque";
+import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque";
import { z } from "zod";
import serverConfig from "./config";
@@ -86,25 +86,17 @@ export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>(
},
);
-export async function triggerSearchReindex(bookmarkId: string) {
- await SearchIndexingQueue.enqueue({
- bookmarkId,
- type: "index",
- });
-}
-
-export async function triggerSearchDeletion(bookmarkId: string) {
- await SearchIndexingQueue.enqueue({
- bookmarkId: bookmarkId,
- type: "delete",
- });
-}
-
-export async function triggerReprocessingFixMode(bookmarkId: string) {
- await AssetPreprocessingQueue.enqueue({
- bookmarkId,
- fixMode: true,
- });
+export async function triggerSearchReindex(
+ bookmarkId: string,
+ opts?: EnqueueOptions,
+) {
+ await SearchIndexingQueue.enqueue(
+ {
+ bookmarkId,
+ type: "index",
+ },
+ opts,
+ );
}
export const zvideoRequestSchema = z.object({
@@ -124,13 +116,6 @@ export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>(
},
);
-export async function triggerVideoWorker(bookmarkId: string, url: string) {
- await VideoWorkerQueue.enqueue({
- bookmarkId,
- url,
- });
-}
-
// Feed Worker
export const zFeedRequestSchema = z.object({
feedId: z.string(),
@@ -191,12 +176,16 @@ export async function triggerWebhook(
bookmarkId: string,
operation: ZWebhookRequest["operation"],
userId?: string,
+ opts?: EnqueueOptions,
) {
- await WebhookQueue.enqueue({
- bookmarkId,
- userId,
- operation,
- });
+ await WebhookQueue.enqueue(
+ {
+ bookmarkId,
+ userId,
+ operation,
+ },
+ opts,
+ );
}
// RuleEngine worker
@@ -219,9 +208,13 @@ export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>(
export async function triggerRuleEngineOnEvent(
bookmarkId: string,
events: z.infer<typeof zRuleEngineEventSchema>[],
+ opts?: EnqueueOptions,
) {
- await RuleEngineQueue.enqueue({
- events,
- bookmarkId,
- });
+ await RuleEngineQueue.enqueue(
+ {
+ events,
+ bookmarkId,
+ },
+ opts,
+ );
}
diff --git a/packages/shared/types/bookmarks.ts b/packages/shared/types/bookmarks.ts
index f648bce5..f96cf0c5 100644
--- a/packages/shared/types/bookmarks.ts
+++ b/packages/shared/types/bookmarks.ts
@@ -139,6 +139,9 @@ export const zNewBookmarkRequestSchema = z
note: z.string().optional(),
summary: z.string().optional(),
createdAt: z.coerce.date().optional(),
+ // A mechanism to prioritize crawling of bookmarks depending on whether
+ // they were created by a user interaction or by a bulk import.
+ crawlPriority: z.enum(["low", "normal"]).optional(),
})
.and(
z.discriminatedUnion("type", [
diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts
index 5e169857..2935f2e8 100644
--- a/packages/trpc/routers/admin.ts
+++ b/packages/trpc/routers/admin.ts
@@ -11,7 +11,6 @@ import {
OpenAIQueue,
SearchIndexingQueue,
TidyAssetsQueue,
- triggerReprocessingFixMode,
triggerSearchReindex,
VideoWorkerQueue,
WebhookQueue,
@@ -238,7 +237,14 @@ export const adminAppRouter = router({
},
});
- await Promise.all(bookmarkIds.map((b) => triggerReprocessingFixMode(b.id)));
+ await Promise.all(
+ bookmarkIds.map((b) =>
+ AssetPreprocessingQueue.enqueue({
+ bookmarkId: b.id,
+ fixMode: true,
+ }),
+ ),
+ );
}),
reRunInferenceOnAllBookmarks: adminProcedure
.input(
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 77f40878..b6acadd6 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -1,5 +1,6 @@
import { experimental_trpcMiddleware, TRPCError } from "@trpc/server";
import { and, count, eq, gt, inArray, lt, or } from "drizzle-orm";
+import { EnqueueOptions } from "liteque";
import invariant from "tiny-invariant";
import { z } from "zod";
@@ -32,8 +33,8 @@ import {
AssetPreprocessingQueue,
LinkCrawlerQueue,
OpenAIQueue,
+ SearchIndexingQueue,
triggerRuleEngineOnEvent,
- triggerSearchDeletion,
triggerSearchReindex,
triggerWebhook,
} from "@karakeep/shared/queues";
@@ -420,37 +421,60 @@ export const bookmarksAppRouter = router({
};
});
+ const enqueueOpts: EnqueueOptions = {
+ // The lower the priority number, the sooner the job will be processed
+ priority: input.crawlPriority === "low" ? 50 : 0,
+ };
+
// Enqueue crawling request
switch (bookmark.content.type) {
case BookmarkTypes.LINK: {
// The crawling job triggers openai when it's done
- await LinkCrawlerQueue.enqueue({
- bookmarkId: bookmark.id,
- });
+ await LinkCrawlerQueue.enqueue(
+ {
+ bookmarkId: bookmark.id,
+ },
+ enqueueOpts,
+ );
break;
}
case BookmarkTypes.TEXT: {
- await OpenAIQueue.enqueue({
- bookmarkId: bookmark.id,
- type: "tag",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId: bookmark.id,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
break;
}
case BookmarkTypes.ASSET: {
- await AssetPreprocessingQueue.enqueue({
- bookmarkId: bookmark.id,
- fixMode: false,
- });
+ await AssetPreprocessingQueue.enqueue(
+ {
+ bookmarkId: bookmark.id,
+ fixMode: false,
+ },
+ enqueueOpts,
+ );
break;
}
}
- await triggerRuleEngineOnEvent(bookmark.id, [
- {
- type: "bookmarkAdded",
- },
- ]);
- await triggerSearchReindex(bookmark.id);
- await triggerWebhook(bookmark.id, "created");
+ await triggerRuleEngineOnEvent(
+ bookmark.id,
+ [
+ {
+ type: "bookmarkAdded",
+ },
+ ],
+ enqueueOpts,
+ );
+ await triggerSearchReindex(bookmark.id, enqueueOpts);
+ await triggerWebhook(
+ bookmark.id,
+ "created",
+ /* userId */ undefined,
+ enqueueOpts,
+ );
return bookmark;
}),
@@ -671,7 +695,10 @@ export const bookmarksAppRouter = router({
eq(bookmarks.id, input.bookmarkId),
),
);
- await triggerSearchDeletion(input.bookmarkId);
+ await SearchIndexingQueue.enqueue({
+ bookmarkId: input.bookmarkId,
+ type: "delete",
+ });
await triggerWebhook(input.bookmarkId, "deleted", ctx.user.id);
if (deleted.changes > 0 && bookmark) {
await cleanupAssetForBookmark({
diff --git a/packages/trpc/testUtils.ts b/packages/trpc/testUtils.ts
index ee9d1d42..4d4bbde5 100644
--- a/packages/trpc/testUtils.ts
+++ b/packages/trpc/testUtils.ts
@@ -87,7 +87,6 @@ export function defaultBeforeEach(seedDB = true) {
triggerRuleEngineOnEvent: vi.fn(),
triggerSearchReindex: vi.fn(),
triggerWebhook: vi.fn(),
- triggerSearchDeletion: vi.fn(),
}));
Object.assign(context, await buildTestContext(seedDB));
};
diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml
index 75c46a1d..6523a51e 100644
--- a/pnpm-lock.yaml
+++ b/pnpm-lock.yaml
@@ -791,8 +791,8 @@ importers:
specifier: ^24.0.0
version: 24.1.3
liteque:
- specifier: ^0.3.2
- version: 0.3.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1)
+ specifier: ^0.4.1
+ version: 0.4.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1)
metascraper:
specifier: ^5.46.18
version: 5.47.1
@@ -1116,8 +1116,8 @@ importers:
specifier: ^1.0.20
version: 1.0.20
liteque:
- specifier: ^0.3.2
- version: 0.3.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1)
+ specifier: ^0.4.1
+ version: 0.4.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1)
meilisearch:
specifier: ^0.37.0
version: 0.37.0(encoding@0.1.13)
@@ -9606,8 +9606,8 @@ packages:
resolution: {integrity: sha512-wUayTU8MS827Dam6MxgD72Ui+KOSF+u/eIqpatOtjnvgJ0+mnDq33uC2M7J0tPK+upe/DpUAuK4JUU89iBoNKQ==}
engines: {node: '>=4'}
- liteque@0.3.2:
- resolution: {integrity: sha512-adBWSpayJ+Pfl0q5/AL4uehYvrLHAaDiqMsccWNGDZo2xgZ0LaZeDomttVBoS4ZLBTDBxDXxgaYoCodD/s1AsA==}
+ liteque@0.4.1:
+ resolution: {integrity: sha512-Z99xHyEiLBDJemV8fWF04IJwMxs6AM+aTvarNpAsmiBRuDhvafT19e5hYmo6Ru6nPrkNuwOQzQbE7BmF4MyFsA==}
peerDependencies:
better-sqlite3: '>=7'
@@ -25821,7 +25821,7 @@ snapshots:
liquid-json@0.3.1: {}
- liteque@0.3.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1):
+ liteque@0.4.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1):
dependencies:
async-mutex: 0.4.1
better-sqlite3: 11.3.0