diff options
| -rw-r--r-- | apps/web/components/settings/ImportExport.tsx | 2 | ||||
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts | 19 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 43 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 4 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts | 11 | ||||
| -rw-r--r-- | packages/open-api/karakeep-openapi-spec.json | 7 | ||||
| -rw-r--r-- | packages/shared/package.json | 2 | ||||
| -rw-r--r-- | packages/shared/queues.ts | 65 | ||||
| -rw-r--r-- | packages/shared/types/bookmarks.ts | 3 | ||||
| -rw-r--r-- | packages/trpc/routers/admin.ts | 10 | ||||
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 67 | ||||
| -rw-r--r-- | packages/trpc/testUtils.ts | 1 | ||||
| -rw-r--r-- | pnpm-lock.yaml | 14 |
14 files changed, 159 insertions, 91 deletions
diff --git a/apps/web/components/settings/ImportExport.tsx b/apps/web/components/settings/ImportExport.tsx index e859f8c2..94703876 100644 --- a/apps/web/components/settings/ImportExport.tsx +++ b/apps/web/components/settings/ImportExport.tsx @@ -131,6 +131,8 @@ export function ImportExportRow() { throw new Error("Content is undefined"); } const created = await createBookmark({ + // This is important to avoid blocking the crawling of more important bookmarks + crawlPriority: "low", title: bookmark.title, createdAt: bookmark.addDate ? new Date(bookmark.addDate * 1000) diff --git a/apps/workers/package.json b/apps/workers/package.json index 595a6e00..4f080169 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -17,7 +17,7 @@ "drizzle-orm": "^0.38.3", "execa": "9.3.1", "jsdom": "^24.0.0", - "liteque": "^0.3.2", + "liteque": "^0.4.1", "metascraper": "^5.46.18", "metascraper-amazon": "^5.46.18", "metascraper-author": "5.46.18", diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index 8231ce50..c3ecd1e0 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -1,6 +1,6 @@ import os from "os"; import { eq } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import PDFParser from "pdf2json"; import { fromBuffer } from "pdf2pic"; import { createWorker } from "tesseract.js"; @@ -345,13 +345,20 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { ); } + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: req.priority, + }; if (!isFixMode || anythingChanged) { - await OpenAIQueue.enqueue({ - bookmarkId, - type: "tag", - }); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "tag", + }, + enqueueOpts, + ); // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); } } diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 428ec0f5..edd1d8f1 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -10,7 +10,7 @@ import { eq } from "drizzle-orm"; import { execa } from "execa"; import { isShuttingDown } from "exit"; import { JSDOM, VirtualConsole } from "jsdom"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob, EnqueueOptions, Runner } from "liteque"; import metascraper from "metascraper"; import metascraperAmazon from "metascraper-amazon"; import metascraperAuthor from "metascraper-author"; @@ -56,8 +56,8 @@ import { LinkCrawlerQueue, OpenAIQueue, triggerSearchReindex, - triggerVideoWorker, triggerWebhook, + VideoWorkerQueue, zCrawlLinkRequestSchema, } from "@karakeep/shared/queues"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; @@ -1034,26 +1034,43 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { job.abortSignal, ); + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: job.priority, + }; + // Enqueue openai job (if not set, assume it's true for backward compatibility) if (job.data.runInference !== false) { - await OpenAIQueue.enqueue({ - bookmarkId, - type: "tag", - }); - await OpenAIQueue.enqueue({ - bookmarkId, - type: "summarize", - }); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "tag", + }, + enqueueOpts, + ); + await OpenAIQueue.enqueue( + { + bookmarkId, + type: "summarize", + }, + enqueueOpts, + ); } // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); // Trigger a potential download of a video from the URL - await triggerVideoWorker(bookmarkId, url); + await VideoWorkerQueue.enqueue( + { + bookmarkId, + url, + }, + enqueueOpts, + ); // Trigger a webhook - await triggerWebhook(bookmarkId, "crawled"); + await triggerWebhook(bookmarkId, "crawled", undefined, enqueueOpts); // Do the archival as a separate last step as it has the potential for failure await archivalLogic(); diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 2d9c8ef7..c7b68faf 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -127,5 +127,7 @@ URL: ${link.url ?? ""} }) .where(eq(bookmarks.id, bookmarkId)); - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, { + priority: job.priority, + }); } diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 271eea4b..3c7b5adb 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,5 +1,5 @@ import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob } from "liteque"; +import { DequeuedJob, EnqueueOptions } from "liteque"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; @@ -434,9 +434,14 @@ export async function runTagging( await connectTags(bookmarkId, tags, bookmark.userId); + // Propagate priority to child jobs + const enqueueOpts: EnqueueOptions = { + priority: job.priority, + }; + // Trigger a webhook - await triggerWebhook(bookmarkId, "ai tagged"); + await triggerWebhook(bookmarkId, "ai tagged", undefined, enqueueOpts); // Update the search index - await triggerSearchReindex(bookmarkId); + await triggerSearchReindex(bookmarkId, enqueueOpts); } diff --git a/packages/open-api/karakeep-openapi-spec.json b/packages/open-api/karakeep-openapi-spec.json index db1e09d0..ac74abbf 100644 --- a/packages/open-api/karakeep-openapi-spec.json +++ b/packages/open-api/karakeep-openapi-spec.json @@ -686,6 +686,13 @@ "createdAt": { "type": "string", "nullable": true + }, + "crawlPriority": { + "type": "string", + "enum": [ + "low", + "normal" + ] } } }, diff --git a/packages/shared/package.json b/packages/shared/package.json index 0210e24f..fbdd6651 100644 --- a/packages/shared/package.json +++ b/packages/shared/package.json @@ -9,7 +9,7 @@ "glob": "^11.0.0", "html-to-text": "^9.0.5", "js-tiktoken": "^1.0.20", - "liteque": "^0.3.2", + "liteque": "^0.4.1", "meilisearch": "^0.37.0", "nodemailer": "^7.0.4", "ollama": "^0.5.14", diff --git a/packages/shared/queues.ts b/packages/shared/queues.ts index a2fdc6b4..6cc5dd83 100644 --- a/packages/shared/queues.ts +++ b/packages/shared/queues.ts @@ -1,5 +1,5 @@ import path from "node:path"; -import { buildDBClient, migrateDB, SqliteQueue } from "liteque"; +import { buildDBClient, EnqueueOptions, migrateDB, SqliteQueue } from "liteque"; import { z } from "zod"; import serverConfig from "./config"; @@ -86,25 +86,17 @@ export const TidyAssetsQueue = new SqliteQueue<ZTidyAssetsRequest>( }, ); -export async function triggerSearchReindex(bookmarkId: string) { - await SearchIndexingQueue.enqueue({ - bookmarkId, - type: "index", - }); -} - -export async function triggerSearchDeletion(bookmarkId: string) { - await SearchIndexingQueue.enqueue({ - bookmarkId: bookmarkId, - type: "delete", - }); -} - -export async function triggerReprocessingFixMode(bookmarkId: string) { - await AssetPreprocessingQueue.enqueue({ - bookmarkId, - fixMode: true, - }); +export async function triggerSearchReindex( + bookmarkId: string, + opts?: EnqueueOptions, +) { + await SearchIndexingQueue.enqueue( + { + bookmarkId, + type: "index", + }, + opts, + ); } export const zvideoRequestSchema = z.object({ @@ -124,13 +116,6 @@ export const VideoWorkerQueue = new SqliteQueue<ZVideoRequest>( }, ); -export async function triggerVideoWorker(bookmarkId: string, url: string) { - await VideoWorkerQueue.enqueue({ - bookmarkId, - url, - }); -} - // Feed Worker export const zFeedRequestSchema = z.object({ feedId: z.string(), @@ -191,12 +176,16 @@ export async function triggerWebhook( bookmarkId: string, operation: ZWebhookRequest["operation"], userId?: string, + opts?: EnqueueOptions, ) { - await WebhookQueue.enqueue({ - bookmarkId, - userId, - operation, - }); + await WebhookQueue.enqueue( + { + bookmarkId, + userId, + operation, + }, + opts, + ); } // RuleEngine worker @@ -219,9 +208,13 @@ export const RuleEngineQueue = new SqliteQueue<ZRuleEngineRequest>( export async function triggerRuleEngineOnEvent( bookmarkId: string, events: z.infer<typeof zRuleEngineEventSchema>[], + opts?: EnqueueOptions, ) { - await RuleEngineQueue.enqueue({ - events, - bookmarkId, - }); + await RuleEngineQueue.enqueue( + { + events, + bookmarkId, + }, + opts, + ); } diff --git a/packages/shared/types/bookmarks.ts b/packages/shared/types/bookmarks.ts index f648bce5..f96cf0c5 100644 --- a/packages/shared/types/bookmarks.ts +++ b/packages/shared/types/bookmarks.ts @@ -139,6 +139,9 @@ export const zNewBookmarkRequestSchema = z note: z.string().optional(), summary: z.string().optional(), createdAt: z.coerce.date().optional(), + // A mechanism to prioritize crawling of bookmarks depending on whether + // they were created by a user interaction or by a bulk import. + crawlPriority: z.enum(["low", "normal"]).optional(), }) .and( z.discriminatedUnion("type", [ diff --git a/packages/trpc/routers/admin.ts b/packages/trpc/routers/admin.ts index 5e169857..2935f2e8 100644 --- a/packages/trpc/routers/admin.ts +++ b/packages/trpc/routers/admin.ts @@ -11,7 +11,6 @@ import { OpenAIQueue, SearchIndexingQueue, TidyAssetsQueue, - triggerReprocessingFixMode, triggerSearchReindex, VideoWorkerQueue, WebhookQueue, @@ -238,7 +237,14 @@ export const adminAppRouter = router({ }, }); - await Promise.all(bookmarkIds.map((b) => triggerReprocessingFixMode(b.id))); + await Promise.all( + bookmarkIds.map((b) => + AssetPreprocessingQueue.enqueue({ + bookmarkId: b.id, + fixMode: true, + }), + ), + ); }), reRunInferenceOnAllBookmarks: adminProcedure .input( diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 77f40878..b6acadd6 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -1,5 +1,6 @@ import { experimental_trpcMiddleware, TRPCError } from "@trpc/server"; import { and, count, eq, gt, inArray, lt, or } from "drizzle-orm"; +import { EnqueueOptions } from "liteque"; import invariant from "tiny-invariant"; import { z } from "zod"; @@ -32,8 +33,8 @@ import { AssetPreprocessingQueue, LinkCrawlerQueue, OpenAIQueue, + SearchIndexingQueue, triggerRuleEngineOnEvent, - triggerSearchDeletion, triggerSearchReindex, triggerWebhook, } from "@karakeep/shared/queues"; @@ -420,37 +421,60 @@ export const bookmarksAppRouter = router({ }; }); + const enqueueOpts: EnqueueOptions = { + // The lower the priority number, the sooner the job will be processed + priority: input.crawlPriority === "low" ? 50 : 0, + }; + // Enqueue crawling request switch (bookmark.content.type) { case BookmarkTypes.LINK: { // The crawling job triggers openai when it's done - await LinkCrawlerQueue.enqueue({ - bookmarkId: bookmark.id, - }); + await LinkCrawlerQueue.enqueue( + { + bookmarkId: bookmark.id, + }, + enqueueOpts, + ); break; } case BookmarkTypes.TEXT: { - await OpenAIQueue.enqueue({ - bookmarkId: bookmark.id, - type: "tag", - }); + await OpenAIQueue.enqueue( + { + bookmarkId: bookmark.id, + type: "tag", + }, + enqueueOpts, + ); break; } case BookmarkTypes.ASSET: { - await AssetPreprocessingQueue.enqueue({ - bookmarkId: bookmark.id, - fixMode: false, - }); + await AssetPreprocessingQueue.enqueue( + { + bookmarkId: bookmark.id, + fixMode: false, + }, + enqueueOpts, + ); break; } } - await triggerRuleEngineOnEvent(bookmark.id, [ - { - type: "bookmarkAdded", - }, - ]); - await triggerSearchReindex(bookmark.id); - await triggerWebhook(bookmark.id, "created"); + await triggerRuleEngineOnEvent( + bookmark.id, + [ + { + type: "bookmarkAdded", + }, + ], + enqueueOpts, + ); + await triggerSearchReindex(bookmark.id, enqueueOpts); + await triggerWebhook( + bookmark.id, + "created", + /* userId */ undefined, + enqueueOpts, + ); return bookmark; }), @@ -671,7 +695,10 @@ export const bookmarksAppRouter = router({ eq(bookmarks.id, input.bookmarkId), ), ); - await triggerSearchDeletion(input.bookmarkId); + await SearchIndexingQueue.enqueue({ + bookmarkId: input.bookmarkId, + type: "delete", + }); await triggerWebhook(input.bookmarkId, "deleted", ctx.user.id); if (deleted.changes > 0 && bookmark) { await cleanupAssetForBookmark({ diff --git a/packages/trpc/testUtils.ts b/packages/trpc/testUtils.ts index ee9d1d42..4d4bbde5 100644 --- a/packages/trpc/testUtils.ts +++ b/packages/trpc/testUtils.ts @@ -87,7 +87,6 @@ export function defaultBeforeEach(seedDB = true) { triggerRuleEngineOnEvent: vi.fn(), triggerSearchReindex: vi.fn(), triggerWebhook: vi.fn(), - triggerSearchDeletion: vi.fn(), })); Object.assign(context, await buildTestContext(seedDB)); }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 75c46a1d..6523a51e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -791,8 +791,8 @@ importers: specifier: ^24.0.0 version: 24.1.3 liteque: - specifier: ^0.3.2 - version: 0.3.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1) + specifier: ^0.4.1 + version: 0.4.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1) metascraper: specifier: ^5.46.18 version: 5.47.1 @@ -1116,8 +1116,8 @@ importers: specifier: ^1.0.20 version: 1.0.20 liteque: - specifier: ^0.3.2 - version: 0.3.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1) + specifier: ^0.4.1 + version: 0.4.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1) meilisearch: specifier: ^0.37.0 version: 0.37.0(encoding@0.1.13) @@ -9606,8 +9606,8 @@ packages: resolution: {integrity: sha512-wUayTU8MS827Dam6MxgD72Ui+KOSF+u/eIqpatOtjnvgJ0+mnDq33uC2M7J0tPK+upe/DpUAuK4JUU89iBoNKQ==} engines: {node: '>=4'} - liteque@0.3.2: - resolution: {integrity: sha512-adBWSpayJ+Pfl0q5/AL4uehYvrLHAaDiqMsccWNGDZo2xgZ0LaZeDomttVBoS4ZLBTDBxDXxgaYoCodD/s1AsA==} + liteque@0.4.1: + resolution: {integrity: sha512-Z99xHyEiLBDJemV8fWF04IJwMxs6AM+aTvarNpAsmiBRuDhvafT19e5hYmo6Ru6nPrkNuwOQzQbE7BmF4MyFsA==} peerDependencies: better-sqlite3: '>=7' @@ -25821,7 +25821,7 @@ snapshots: liquid-json@0.3.1: {} - liteque@0.3.2(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1): + liteque@0.4.1(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/react@19.1.6)(better-sqlite3@11.3.0)(react@18.3.1): dependencies: async-mutex: 0.4.1 better-sqlite3: 11.3.0 |
