aboutsummaryrefslogtreecommitdiffstats
path: root/packages
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-07-12 13:06:46 +0000
committerMohamed Bassem <me@mbassem.com>2025-07-12 13:06:46 +0000
commit9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f (patch)
tree099f263c00e55d97d3d4b1464fdd116d917f70c0 /packages
parent8e3013ba96532cab61eb6e5fae2ce30be5e94a57 (diff)
downloadkarakeep-9fb3ef6f6d0d7fff6d9aa59a0dc2407ad8e4eb3f.tar.zst
fix: Prioritize crawling user added links over bulk imports. fixes #1717
Diffstat (limited to 'packages')
-rw-r--r--packages/open-api/karakeep-openapi-spec.json7
-rw-r--r--packages/shared/package.json2
-rw-r--r--packages/shared/queues.ts65
-rw-r--r--packages/shared/types/bookmarks.ts3
-rw-r--r--packages/trpc/routers/admin.ts10
-rw-r--r--packages/trpc/routers/bookmarks.ts67
-rw-r--r--packages/trpc/testUtils.ts1
7 files changed, 95 insertions, 60 deletions
diff --git a/packages/open-api/karakeep-openapi-spec.json b/packages/open-api/karakeep-openapi-spec.json
index db1e09d0..ac74abbf 100644
--- a/packages/open-api/karakeep-openapi-spec.json
+++ b/packages/open-api/karakeep-openapi-spec.json
@@ -686,6 +686,13 @@
"createdAt": {
"type": "string",
"nullable": true
+ },
+ "crawlPriority": {
+ "type": "string",
+ "enum": [
+ "low",
+ "normal"
+ ]
}
}
},
diff --git a/packages/shared/package.json b/packages/shared/package.json
index 0210e24f..fbdd6651 100644
--- a/packages/shared/package.json
+++ b/packages/shared/package.json
@@ -9,7 +9,7 @@
"glob": "^11.0.0",
"html-to-text": "^9.0.5",
"js-tiktoken": "^1.0.20",
- "liteque": "^0.3.2",
+ "liteque": "^0.4.1",
"meilisearch": "^0.37.0",
"nodemailer": "^7.0.4",
"ollama": "^0.5.14",
diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts
index a2fdc6b4..6cc5dd83 100644
--- a/packages/shared/queues.ts
+++ b/packages/shared/queues.ts
@@ -1,5 +1,5 @@
import path from "node:path";
-import { buildDBClient, migrateDB, SqliteQueue } from "liteque";
+import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque";
import { z } from "zod";
import serverConfig from "./config";
@@ -86,25 +86,17 @@ export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>(
},
);
-export async function triggerSearchReindex(bookmarkId: string) {
- await SearchIndexingQueue.enqueue({
- bookmarkId,
- type: "index",
- });
-}
-
-export async function triggerSearchDeletion(bookmarkId: string) {
- await SearchIndexingQueue.enqueue({
- bookmarkId: bookmarkId,
- type: "delete",
- });
-}
-
-export async function triggerReprocessingFixMode(bookmarkId: string) {
- await AssetPreprocessingQueue.enqueue({
- bookmarkId,
- fixMode: true,
- });
+export async function triggerSearchReindex(
+ bookmarkId: string,
+ opts?: EnqueueOptions,
+) {
+ await SearchIndexingQueue.enqueue(
+ {
+ bookmarkId,
+ type: "index",
+ },
+ opts,
+ );
}
export const zvideoRequestSchema = z.object({
@@ -124,13 +116,6 @@ export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>(
},
);
-export async function triggerVideoWorker(bookmarkId: string, url: string) {
- await VideoWorkerQueue.enqueue({
- bookmarkId,
- url,
- });
-}
-
// Feed Worker
export const zFeedRequestSchema = z.object({
feedId: z.string(),
@@ -191,12 +176,16 @@ export async function triggerWebhook(
bookmarkId: string,
operation: ZWebhookRequest["operation"],
userId?: string,
+ opts?: EnqueueOptions,
) {
- await WebhookQueue.enqueue({
- bookmarkId,
- userId,
- operation,
- });
+ await WebhookQueue.enqueue(
+ {
+ bookmarkId,
+ userId,
+ operation,
+ },
+ opts,
+ );
}
// RuleEngine worker
@@ -219,9 +208,13 @@ export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>(
export async function triggerRuleEngineOnEvent(
bookmarkId: string,
events: z.infer<typeof zRuleEngineEventSchema>[],
+ opts?: EnqueueOptions,
) {
- await RuleEngineQueue.enqueue({
- events,
- bookmarkId,
- });
+ await RuleEngineQueue.enqueue(
+ {
+ events,
+ bookmarkId,
+ },
+ opts,
+ );
}
diff --git a/packages/shared/types/bookmarks.ts b/packages/shared/types/bookmarks.ts
index f648bce5..f96cf0c5 100644
--- a/packages/shared/types/bookmarks.ts
+++ b/packages/shared/types/bookmarks.ts
@@ -139,6 +139,9 @@ export const zNewBookmarkRequestSchema = z
note: z.string().optional(),
summary: z.string().optional(),
createdAt: z.coerce.date().optional(),
+ // A mechanism to prioritize crawling of bookmarks depending on whether
+ // they were created by a user interaction or by a bulk import.
+ crawlPriority: z.enum(["low", "normal"]).optional(),
})
.and(
z.discriminatedUnion("type", [
diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts
index 5e169857..2935f2e8 100644
--- a/packages/trpc/routers/admin.ts
+++ b/packages/trpc/routers/admin.ts
@@ -11,7 +11,6 @@ import {
OpenAIQueue,
SearchIndexingQueue,
TidyAssetsQueue,
- triggerReprocessingFixMode,
triggerSearchReindex,
VideoWorkerQueue,
WebhookQueue,
@@ -238,7 +237,14 @@ export const adminAppRouter = router({
},
});
- await Promise.all(bookmarkIds.map((b) => triggerReprocessingFixMode(b.id)));
+ await Promise.all(
+ bookmarkIds.map((b) =>
+ AssetPreprocessingQueue.enqueue({
+ bookmarkId: b.id,
+ fixMode: true,
+ }),
+ ),
+ );
}),
reRunInferenceOnAllBookmarks: adminProcedure
.input(
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 77f40878..b6acadd6 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -1,5 +1,6 @@
import { experimental_trpcMiddleware, TRPCError } from "@trpc/server";
import { and, count, eq, gt, inArray, lt, or } from "drizzle-orm";
+import { EnqueueOptions } from "liteque";
import invariant from "tiny-invariant";
import { z } from "zod";
@@ -32,8 +33,8 @@ import {
AssetPreprocessingQueue,
LinkCrawlerQueue,
OpenAIQueue,
+ SearchIndexingQueue,
triggerRuleEngineOnEvent,
- triggerSearchDeletion,
triggerSearchReindex,
triggerWebhook,
} from "@karakeep/shared/queues";
@@ -420,37 +421,60 @@ export const bookmarksAppRouter = router({
};
});
+ const enqueueOpts: EnqueueOptions = {
+ // The lower the priority number, the sooner the job will be processed
+ priority: input.crawlPriority === "low" ? 50 : 0,
+ };
+
// Enqueue crawling request
switch (bookmark.content.type) {
case BookmarkTypes.LINK: {
// The crawling job triggers openai when it's done
- await LinkCrawlerQueue.enqueue({
- bookmarkId: bookmark.id,
- });
+ await LinkCrawlerQueue.enqueue(
+ {
+ bookmarkId: bookmark.id,
+ },
+ enqueueOpts,
+ );
break;
}
case BookmarkTypes.TEXT: {
- await OpenAIQueue.enqueue({
- bookmarkId: bookmark.id,
- type: "tag",
- });
+ await OpenAIQueue.enqueue(
+ {
+ bookmarkId: bookmark.id,
+ type: "tag",
+ },
+ enqueueOpts,
+ );
break;
}
case BookmarkTypes.ASSET: {
- await AssetPreprocessingQueue.enqueue({
- bookmarkId: bookmark.id,
- fixMode: false,
- });
+ await AssetPreprocessingQueue.enqueue(
+ {
+ bookmarkId: bookmark.id,
+ fixMode: false,
+ },
+ enqueueOpts,
+ );
break;
}
}
- await triggerRuleEngineOnEvent(bookmark.id, [
- {
- type: "bookmarkAdded",
- },
- ]);
- await triggerSearchReindex(bookmark.id);
- await triggerWebhook(bookmark.id, "created");
+ await triggerRuleEngineOnEvent(
+ bookmark.id,
+ [
+ {
+ type: "bookmarkAdded",
+ },
+ ],
+ enqueueOpts,
+ );
+ await triggerSearchReindex(bookmark.id, enqueueOpts);
+ await triggerWebhook(
+ bookmark.id,
+ "created",
+ /* userId */ undefined,
+ enqueueOpts,
+ );
return bookmark;
}),
@@ -671,7 +695,10 @@ export const bookmarksAppRouter = router({
eq(bookmarks.id, input.bookmarkId),
),
);
- await triggerSearchDeletion(input.bookmarkId);
+ await SearchIndexingQueue.enqueue({
+ bookmarkId: input.bookmarkId,
+ type: "delete",
+ });
await triggerWebhook(input.bookmarkId, "deleted", ctx.user.id);
if (deleted.changes > 0 && bookmark) {
await cleanupAssetForBookmark({
diff --git a/packages/trpc/testUtils.ts b/packages/trpc/testUtils.ts
index ee9d1d42..4d4bbde5 100644
--- a/packages/trpc/testUtils.ts
+++ b/packages/trpc/testUtils.ts
@@ -87,7 +87,6 @@ export function defaultBeforeEach(seedDB = true) {
triggerRuleEngineOnEvent: vi.fn(),
triggerSearchReindex: vi.fn(),
triggerWebhook: vi.fn(),
- triggerSearchDeletion: vi.fn(),
}));
Object.assign(context, await buildTestContext(seedDB));
};