diff options
Diffstat (limited to '')
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 574 |
1 files changed, 306 insertions, 268 deletions
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 15ded2bd..782566cf 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -1,6 +1,5 @@ import { experimental_trpcMiddleware, TRPCError } from "@trpc/server"; import { and, eq, gt, inArray, lt, or } from "drizzle-orm"; -import invariant from "tiny-invariant"; import { z } from "zod"; import type { ZBookmarkContent } from "@karakeep/shared/types/bookmarks"; @@ -15,11 +14,14 @@ import { bookmarkTexts, customPrompts, tagsOnBookmarks, + users, } from "@karakeep/db/schema"; import { AssetPreprocessingQueue, LinkCrawlerQueue, + LowPriorityCrawlerQueue, OpenAIQueue, + QueuePriority, QuotaService, triggerRuleEngineOnEvent, triggerSearchReindex, @@ -28,7 +30,7 @@ import { import { SUPPORTED_BOOKMARK_ASSET_TYPES } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import { InferenceClientFactory } from "@karakeep/shared/inference"; -import { buildSummaryPrompt } from "@karakeep/shared/prompts"; +import { buildSummaryPrompt } from "@karakeep/shared/prompts.server"; import { EnqueueOptions } from "@karakeep/shared/queueing"; import { FilterQuery, getSearchClient } from "@karakeep/shared/search"; import { parseSearchQuery } from "@karakeep/shared/searchQueryParser"; @@ -49,9 +51,8 @@ import { normalizeTagName } from "@karakeep/shared/utils/tag"; import type { AuthedContext } from "../index"; import { authedProcedure, createRateLimitMiddleware, router } from "../index"; import { getBookmarkIdsFromMatcher } from "../lib/search"; +import { Asset } from "../models/assets"; import { BareBookmark, Bookmark } from "../models/bookmarks"; -import { ImportSession } from "../models/importSessions"; -import { ensureAssetOwnership } from "./assets"; export const ensureBookmarkOwnership = experimental_trpcMiddleware<{ ctx: AuthedContext; @@ -121,173 +122,173 @@ export const bookmarksAppRouter = router({ // This doesn't 100% protect from duplicates because of races, but it's more than enough for this usecase. const alreadyExists = await attemptToDedupLink(ctx, input.url); if (alreadyExists) { - if (input.importSessionId) { - const session = await ImportSession.fromId( - ctx, - input.importSessionId, - ); - await session.attachBookmark(alreadyExists.id); - } return { ...alreadyExists, alreadyExists: true }; } } - // Check user quota - const quotaResult = await QuotaService.canCreateBookmark( - ctx.db, - ctx.user.id, - ); - if (!quotaResult.result) { - throw new TRPCError({ - code: "FORBIDDEN", - message: quotaResult.error, - }); - } - - const bookmark = await ctx.db.transaction(async (tx) => { - const bookmark = ( - await tx - .insert(bookmarks) - .values({ - userId: ctx.user.id, - title: input.title, - type: input.type, - archived: input.archived, - favourited: input.favourited, - note: input.note, - summary: input.summary, - createdAt: input.createdAt, - source: input.source, - }) - .returning() - )[0]; + const bookmark = await ctx.db.transaction( + async (tx) => { + // Check user quota + const quotaResult = await QuotaService.canCreateBookmark( + tx, + ctx.user.id, + ); + if (!quotaResult.result) { + throw new TRPCError({ + code: "FORBIDDEN", + message: quotaResult.error, + }); + } + const bookmark = ( + await tx + .insert(bookmarks) + .values({ + userId: ctx.user.id, + title: input.title, + type: input.type, + archived: input.archived, + favourited: input.favourited, + note: input.note, + summary: input.summary, + createdAt: input.createdAt, + source: input.source, + // Only links currently support summarization. Let's set the status to null for other types for now. + summarizationStatus: + input.type === BookmarkTypes.LINK ? "pending" : null, + }) + .returning() + )[0]; - let content: ZBookmarkContent; + let content: ZBookmarkContent; - switch (input.type) { - case BookmarkTypes.LINK: { - const link = ( - await tx - .insert(bookmarkLinks) + switch (input.type) { + case BookmarkTypes.LINK: { + const link = ( + await tx + .insert(bookmarkLinks) + .values({ + id: bookmark.id, + url: input.url.trim(), + }) + .returning() + )[0]; + if (input.precrawledArchiveId) { + await Asset.ensureOwnership(ctx, input.precrawledArchiveId); + await tx + .update(assets) + .set({ + bookmarkId: bookmark.id, + assetType: AssetTypes.LINK_PRECRAWLED_ARCHIVE, + }) + .where( + and( + eq(assets.id, input.precrawledArchiveId), + eq(assets.userId, ctx.user.id), + ), + ); + } + content = { + type: BookmarkTypes.LINK, + ...link, + }; + break; + } + case BookmarkTypes.TEXT: { + const text = ( + await tx + .insert(bookmarkTexts) + .values({ + id: bookmark.id, + text: input.text, + sourceUrl: input.sourceUrl, + }) + .returning() + )[0]; + content = { + type: BookmarkTypes.TEXT, + text: text.text ?? "", + sourceUrl: text.sourceUrl, + }; + break; + } + case BookmarkTypes.ASSET: { + const [asset] = await tx + .insert(bookmarkAssets) .values({ id: bookmark.id, - url: input.url.trim(), + assetType: input.assetType, + assetId: input.assetId, + content: null, + metadata: null, + fileName: input.fileName ?? null, + sourceUrl: null, }) - .returning() - )[0]; - if (input.precrawledArchiveId) { - await ensureAssetOwnership({ - ctx, - assetId: input.precrawledArchiveId, - }); + .returning(); + const uploadedAsset = await Asset.fromId(ctx, input.assetId); + uploadedAsset.ensureOwnership(); + if ( + !uploadedAsset.asset.contentType || + !SUPPORTED_BOOKMARK_ASSET_TYPES.has( + uploadedAsset.asset.contentType, + ) + ) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Unsupported asset type", + }); + } await tx .update(assets) .set({ bookmarkId: bookmark.id, - assetType: AssetTypes.LINK_PRECRAWLED_ARCHIVE, + assetType: AssetTypes.BOOKMARK_ASSET, }) .where( and( - eq(assets.id, input.precrawledArchiveId), + eq(assets.id, input.assetId), eq(assets.userId, ctx.user.id), ), ); + content = { + type: BookmarkTypes.ASSET, + assetType: asset.assetType, + assetId: asset.assetId, + }; + break; } - content = { - type: BookmarkTypes.LINK, - ...link, - }; - break; } - case BookmarkTypes.TEXT: { - const text = ( - await tx - .insert(bookmarkTexts) - .values({ - id: bookmark.id, - text: input.text, - sourceUrl: input.sourceUrl, - }) - .returning() - )[0]; - content = { - type: BookmarkTypes.TEXT, - text: text.text ?? "", - sourceUrl: text.sourceUrl, - }; - break; - } - case BookmarkTypes.ASSET: { - const [asset] = await tx - .insert(bookmarkAssets) - .values({ - id: bookmark.id, - assetType: input.assetType, - assetId: input.assetId, - content: null, - metadata: null, - fileName: input.fileName ?? null, - sourceUrl: null, - }) - .returning(); - const uploadedAsset = await ensureAssetOwnership({ - ctx, - assetId: input.assetId, - }); - if ( - !uploadedAsset.contentType || - !SUPPORTED_BOOKMARK_ASSET_TYPES.has(uploadedAsset.contentType) - ) { - throw new TRPCError({ - code: "BAD_REQUEST", - message: "Unsupported asset type", - }); - } - await tx - .update(assets) - .set({ - bookmarkId: bookmark.id, - assetType: AssetTypes.BOOKMARK_ASSET, - }) - .where( - and( - eq(assets.id, input.assetId), - eq(assets.userId, ctx.user.id), - ), - ); - content = { - type: BookmarkTypes.ASSET, - assetType: asset.assetType, - assetId: asset.assetId, - }; - break; - } - } - return { - alreadyExists: false, - tags: [] as ZBookmarkTags[], - assets: [], - content, - ...bookmark, - }; - }); - - if (input.importSessionId) { - const session = await ImportSession.fromId(ctx, input.importSessionId); - await session.attachBookmark(bookmark.id); - } + return { + alreadyExists: false, + tags: [] as ZBookmarkTags[], + assets: [], + content, + ...bookmark, + }; + }, + { + behavior: "immediate", + }, + ); const enqueueOpts: EnqueueOptions = { // The lower the priority number, the sooner the job will be processed - priority: input.crawlPriority === "low" ? 50 : 0, + priority: + input.crawlPriority === "low" + ? QueuePriority.Low + : QueuePriority.Default, groupId: ctx.user.id, }; switch (bookmark.content.type) { case BookmarkTypes.LINK: { // The crawling job triggers openai when it's done - await LinkCrawlerQueue.enqueue( + // Use a separate queue for low priority crawling to avoid impacting main queue parallelism + const crawlerQueue = + input.crawlPriority === "low" + ? LowPriorityCrawlerQueue + : LinkCrawlerQueue; + await crawlerQueue.enqueue( { bookmarkId: bookmark.id, }, @@ -317,22 +318,24 @@ export const bookmarksAppRouter = router({ } } - await triggerRuleEngineOnEvent( - bookmark.id, - [ - { - type: "bookmarkAdded", - }, - ], - enqueueOpts, - ); - await triggerSearchReindex(bookmark.id, enqueueOpts); - await triggerWebhook( - bookmark.id, - "created", - /* userId */ undefined, - enqueueOpts, - ); + await Promise.all([ + triggerRuleEngineOnEvent( + bookmark.id, + [ + { + type: "bookmarkAdded", + }, + ], + enqueueOpts, + ), + triggerSearchReindex(bookmark.id, enqueueOpts), + triggerWebhook( + bookmark.id, + "created", + /* userId */ undefined, + enqueueOpts, + ), + ]); return bookmark; }), @@ -487,13 +490,14 @@ export const bookmarksAppRouter = router({ })), ); } - // Trigger re-indexing and webhooks - await triggerSearchReindex(input.bookmarkId, { - groupId: ctx.user.id, - }); - await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { - groupId: ctx.user.id, - }); + await Promise.all([ + triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }), + triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }), + ]); return updatedBookmark; }), @@ -532,12 +536,14 @@ export const bookmarksAppRouter = router({ ), ); }); - await triggerSearchReindex(input.bookmarkId, { - groupId: ctx.user.id, - }); - await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { - groupId: ctx.user.id, - }); + await Promise.all([ + triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }), + triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }), + ]); }), deleteBookmark: authedProcedure @@ -559,24 +565,20 @@ export const bookmarksAppRouter = router({ z.object({ bookmarkId: z.string(), archiveFullPage: z.boolean().optional().default(false), + storePdf: z.boolean().optional().default(false), }), ) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { - await ctx.db - .update(bookmarkLinks) - .set({ - crawlStatus: "pending", - crawlStatusCode: null, - }) - .where(eq(bookmarkLinks.id, input.bookmarkId)); - await LinkCrawlerQueue.enqueue( + await LowPriorityCrawlerQueue.enqueue( { bookmarkId: input.bookmarkId, archiveFullPage: input.archiveFullPage, + storePdf: input.storePdf, }, { groupId: ctx.user.id, + priority: QueuePriority.Low, }, ); }), @@ -711,36 +713,109 @@ export const bookmarksAppRouter = router({ ) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { - const res = await ctx.db.transaction(async (tx) => { - // Detaches - const idsToRemove: string[] = []; - if (input.detach.length > 0) { - const namesToRemove: string[] = []; - input.detach.forEach((detachInfo) => { - if (detachInfo.tagId) { - idsToRemove.push(detachInfo.tagId); - } - if (detachInfo.tagName) { - namesToRemove.push(detachInfo.tagName); - } - }); + // Helper function to fetch tag IDs and their names from a list of tag identifiers + const fetchTagIdsWithNames = async ( + tagIdentifiers: { tagId?: string; tagName?: string }[], + ): Promise<{ id: string; name: string }[]> => { + const tagIds = tagIdentifiers.flatMap((t) => + t.tagId ? [t.tagId] : [], + ); + const tagNames = tagIdentifiers.flatMap((t) => + t.tagName ? [t.tagName] : [], + ); - if (namesToRemove.length > 0) { - ( - await tx.query.bookmarkTags.findMany({ - where: and( - eq(bookmarkTags.userId, ctx.user.id), - inArray(bookmarkTags.name, namesToRemove), - ), - columns: { - id: true, - }, - }) - ).forEach((tag) => { - idsToRemove.push(tag.id); - }); + // Fetch tag IDs in parallel + const [byIds, byNames] = await Promise.all([ + tagIds.length > 0 + ? ctx.db + .select({ id: bookmarkTags.id, name: bookmarkTags.name }) + .from(bookmarkTags) + .where( + and( + eq(bookmarkTags.userId, ctx.user.id), + inArray(bookmarkTags.id, tagIds), + ), + ) + : Promise.resolve([]), + tagNames.length > 0 + ? ctx.db + .select({ id: bookmarkTags.id, name: bookmarkTags.name }) + .from(bookmarkTags) + .where( + and( + eq(bookmarkTags.userId, ctx.user.id), + inArray(bookmarkTags.name, tagNames), + ), + ) + : Promise.resolve([]), + ]); + + // Union results and deduplicate by tag ID + const seen = new Set<string>(); + const results: { id: string; name: string }[] = []; + + for (const tag of [...byIds, ...byNames]) { + if (!seen.has(tag.id)) { + seen.add(tag.id); + results.push({ id: tag.id, name: tag.name }); } + } + + return results; + }; + + // Normalize tag names and create new tags outside transaction to reduce transaction duration + const normalizedAttachTags = input.attach.map((tag) => ({ + tagId: tag.tagId, + tagName: tag.tagName ? normalizeTagName(tag.tagName) : undefined, + attachedBy: tag.attachedBy, + })); + + { + // Create new tags + const toAddTagNames = normalizedAttachTags + .flatMap((i) => (i.tagName ? [i.tagName] : [])) + .filter((n) => n.length > 0); // drop empty results + + if (toAddTagNames.length > 0) { + await ctx.db + .insert(bookmarkTags) + .values( + toAddTagNames.map((name) => ({ name, userId: ctx.user.id })), + ) + .onConflictDoNothing(); + } + } + + // Fetch tag IDs for attachment/detachment now that we know that they all exist + const [attachTagsWithNames, detachTagsWithNames] = await Promise.all([ + fetchTagIdsWithNames(normalizedAttachTags), + fetchTagIdsWithNames(input.detach), + ]); + + // Build the attachedBy map from the fetched results + const tagIdToAttachedBy = new Map<string, "ai" | "human">(); + + for (const fetchedTag of attachTagsWithNames) { + // Find the corresponding input tag + const inputTag = normalizedAttachTags.find( + (t) => + (t.tagId && t.tagId === fetchedTag.id) || + (t.tagName && t.tagName === fetchedTag.name), + ); + + if (inputTag) { + tagIdToAttachedBy.set(fetchedTag.id, inputTag.attachedBy); + } + } + + // Extract just the IDs for the transaction + const allIdsToAttach = attachTagsWithNames.map((t) => t.id); + const idsToRemove = detachTagsWithNames.map((t) => t.id); + const res = await ctx.db.transaction(async (tx) => { + // Detaches + if (idsToRemove.length > 0) { await tx .delete(tagsOnBookmarks) .where( @@ -751,67 +826,21 @@ export const bookmarksAppRouter = router({ ); } - if (input.attach.length == 0) { - return { - bookmarkId: input.bookmarkId, - attached: [], - detached: idsToRemove, - }; - } - - const toAddTagNames = input.attach - .flatMap((i) => (i.tagName ? [i.tagName] : [])) - .map(normalizeTagName) // strip leading # - .filter((n) => n.length > 0); // drop empty results - - const toAddTagIds = input.attach.flatMap((i) => - i.tagId ? [i.tagId] : [], - ); - - // New Tags - if (toAddTagNames.length > 0) { + // Attach tags + if (allIdsToAttach.length > 0) { await tx - .insert(bookmarkTags) + .insert(tagsOnBookmarks) .values( - toAddTagNames.map((name) => ({ name, userId: ctx.user.id })), + allIdsToAttach.map((i) => ({ + tagId: i, + bookmarkId: input.bookmarkId, + attachedBy: tagIdToAttachedBy.get(i) ?? "human", + })), ) - .onConflictDoNothing() - .returning(); + .onConflictDoNothing(); } - // If there is nothing to add, the "or" statement will become useless and - // the query below will simply select all the existing tags for this user and assign them to the bookmark - invariant(toAddTagNames.length > 0 || toAddTagIds.length > 0); - const allIds = ( - await tx.query.bookmarkTags.findMany({ - where: and( - eq(bookmarkTags.userId, ctx.user.id), - or( - toAddTagIds.length > 0 - ? inArray(bookmarkTags.id, toAddTagIds) - : undefined, - toAddTagNames.length > 0 - ? inArray(bookmarkTags.name, toAddTagNames) - : undefined, - ), - ), - columns: { - id: true, - }, - }) - ).map((t) => t.id); - - await tx - .insert(tagsOnBookmarks) - .values( - allIds.map((i) => ({ - tagId: i, - bookmarkId: input.bookmarkId, - attachedBy: "human" as const, - userId: ctx.user.id, - })), - ) - .onConflictDoNothing(); + // Update bookmark modified timestamp await tx .update(bookmarks) .set({ modifiedAt: new Date() }) @@ -824,7 +853,7 @@ export const bookmarksAppRouter = router({ return { bookmarkId: input.bookmarkId, - attached: allIds, + attached: allIdsToAttach, detached: idsToRemove, }; }); @@ -958,8 +987,15 @@ Author: ${bookmark.author ?? ""} }, }); + const userSettings = await ctx.db.query.users.findFirst({ + where: eq(users.id, ctx.user.id), + columns: { + inferredTagLang: true, + }, + }); + const summaryPrompt = await buildSummaryPrompt( - serverConfig.inference.inferredTagLang, + userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang, prompts.map((p) => p.text), bookmarkDetails, serverConfig.inference.contextLength, @@ -981,12 +1017,14 @@ Author: ${bookmark.author ?? ""} summary: summary.response, }) .where(eq(bookmarks.id, input.bookmarkId)); - await triggerSearchReindex(input.bookmarkId, { - groupId: ctx.user.id, - }); - await triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { - groupId: ctx.user.id, - }); + await Promise.all([ + triggerSearchReindex(input.bookmarkId, { + groupId: ctx.user.id, + }), + triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { + groupId: ctx.user.id, + }), + ]); return { bookmarkId: input.bookmarkId, |
