import { experimental_trpcMiddleware, TRPCError } from "@trpc/server"; import { and, eq, gt, inArray, lt, or } from "drizzle-orm"; import { z } from "zod"; import type { ZBookmarkContent } from "@karakeep/shared/types/bookmarks"; import type { ZBookmarkTags } from "@karakeep/shared/types/tags"; import { assets, AssetTypes, bookmarkAssets, bookmarkLinks, bookmarks, bookmarkTags, bookmarkTexts, customPrompts, tagsOnBookmarks, users, } from "@karakeep/db/schema"; import { AssetPreprocessingQueue, LinkCrawlerQueue, LowPriorityCrawlerQueue, OpenAIQueue, QueuePriority, QuotaService, triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, } from "@karakeep/shared-server"; import { SUPPORTED_BOOKMARK_ASSET_TYPES } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; import { InferenceClientFactory } from "@karakeep/shared/inference"; import { buildSummaryPrompt } from "@karakeep/shared/prompts.server"; import { EnqueueOptions } from "@karakeep/shared/queueing"; import { FilterQuery, getSearchClient } from "@karakeep/shared/search"; import { parseSearchQuery } from "@karakeep/shared/searchQueryParser"; import { BookmarkTypes, DEFAULT_NUM_BOOKMARKS_PER_PAGE, zBookmarkSchema, zGetBookmarksRequestSchema, zGetBookmarksResponseSchema, zManipulatedTagSchema, zNewBookmarkRequestSchema, zSearchBookmarksCursor, zSearchBookmarksRequestSchema, zUpdateBookmarksRequestSchema, } from "@karakeep/shared/types/bookmarks"; import { normalizeTagName } from "@karakeep/shared/utils/tag"; import type { AuthedContext } from "../index"; import { authedProcedure, createRateLimitMiddleware, router } from "../index"; import { getBookmarkIdsFromMatcher } from "../lib/search"; import { Asset } from "../models/assets"; import { BareBookmark, Bookmark } from "../models/bookmarks"; export const ensureBookmarkOwnership = experimental_trpcMiddleware<{ ctx: AuthedContext; input: { bookmarkId: string }; }>().create(async (opts) => { const bookmark = await BareBookmark.bareFromId( opts.ctx, opts.input.bookmarkId, ); bookmark.ensureOwnership(); return opts.next({ ctx: { ...opts.ctx, bookmark, }, }); }); export const ensureBookmarkAccess = experimental_trpcMiddleware<{ ctx: AuthedContext; input: { bookmarkId: string }; }>().create(async (opts) => { // Throws if bookmark doesn't exist or user doesn't have access const bookmark = await BareBookmark.bareFromId( opts.ctx, opts.input.bookmarkId, ); return opts.next({ ctx: { ...opts.ctx, bookmark, }, }); }); async function attemptToDedupLink(ctx: AuthedContext, url: string) { const result = await ctx.db .select({ id: bookmarkLinks.id, }) .from(bookmarkLinks) .leftJoin(bookmarks, eq(bookmarks.id, bookmarkLinks.id)) .where(and(eq(bookmarkLinks.url, url), eq(bookmarks.userId, ctx.user.id))); if (result.length == 0) { return null; } return ( await Bookmark.fromId(ctx, result[0].id, /* includeContent: */ false) ).asZBookmark(); } export const bookmarksAppRouter = router({ createBookmark: authedProcedure .input(zNewBookmarkRequestSchema) .output( zBookmarkSchema.merge( z.object({ alreadyExists: z.boolean().optional().default(false), }), ), ) .mutation(async ({ input, ctx }) => { if (input.type == BookmarkTypes.LINK) { // This doesn't 100% protect from duplicates because of races, but it's more than enough for this usecase. const alreadyExists = await attemptToDedupLink(ctx, input.url); if (alreadyExists) { return { ...alreadyExists, alreadyExists: true }; } } const bookmark = await ctx.db.transaction( async (tx) => { // Check user quota const quotaResult = await QuotaService.canCreateBookmark( tx, ctx.user.id, ); if (!quotaResult.result) { throw new TRPCError({ code: "FORBIDDEN", message: quotaResult.error, }); } const bookmark = ( await tx .insert(bookmarks) .values({ userId: ctx.user.id, title: input.title, type: input.type, archived: input.archived, favourited: input.favourited, note: input.note, summary: input.summary, createdAt: input.createdAt, source: input.source, // Only links currently support summarization. Let's set the status to null for other types for now. summarizationStatus: input.type === BookmarkTypes.LINK ? "pending" : null, }) .returning() )[0]; let content: ZBookmarkContent; switch (input.type) { case BookmarkTypes.LINK: { const link = ( await tx .insert(bookmarkLinks) .values({ id: bookmark.id, url: input.url.trim(), }) .returning() )[0]; if (input.precrawledArchiveId) { await Asset.ensureOwnership(ctx, input.precrawledArchiveId); await tx .update(assets) .set({ bookmarkId: bookmark.id, assetType: AssetTypes.LINK_PRECRAWLED_ARCHIVE, }) .where( and( eq(assets.id, input.precrawledArchiveId), eq(assets.userId, ctx.user.id), ), ); } content = { type: BookmarkTypes.LINK, ...link, }; break; } case BookmarkTypes.TEXT: { const text = ( await tx .insert(bookmarkTexts) .values({ id: bookmark.id, text: input.text, sourceUrl: input.sourceUrl, }) .returning() )[0]; content = { type: BookmarkTypes.TEXT, text: text.text ?? "", sourceUrl: text.sourceUrl, }; break; } case BookmarkTypes.ASSET: { const [asset] = await tx .insert(bookmarkAssets) .values({ id: bookmark.id, assetType: input.assetType, assetId: input.assetId, content: null, metadata: null, fileName: input.fileName ?? null, sourceUrl: null, }) .returning(); const uploadedAsset = await Asset.fromId(ctx, input.assetId); uploadedAsset.ensureOwnership(); if ( !uploadedAsset.asset.contentType || !SUPPORTED_BOOKMARK_ASSET_TYPES.has( uploadedAsset.asset.contentType, ) ) { throw new TRPCError({ code: "BAD_REQUEST", message: "Unsupported asset type", }); } await tx .update(assets) .set({ bookmarkId: bookmark.id, assetType: AssetTypes.BOOKMARK_ASSET, }) .where( and( eq(assets.id, input.assetId), eq(assets.userId, ctx.user.id), ), ); content = { type: BookmarkTypes.ASSET, assetType: asset.assetType, assetId: asset.assetId, }; break; } } return { alreadyExists: false, tags: [] as ZBookmarkTags[], assets: [], content, ...bookmark, }; }, { behavior: "immediate", }, ); const enqueueOpts: EnqueueOptions = { // The lower the priority number, the sooner the job will be processed priority: input.crawlPriority === "low" ? QueuePriority.Low : QueuePriority.Default, groupId: ctx.user.id, }; switch (bookmark.content.type) { case BookmarkTypes.LINK: { // The crawling job triggers openai when it's done // Use a separate queue for low priority crawling to avoid impacting main queue parallelism const crawlerQueue = input.crawlPriority === "low" ? LowPriorityCrawlerQueue : LinkCrawlerQueue; await crawlerQueue.enqueue( { bookmarkId: bookmark.id, }, enqueueOpts, ); break; } case BookmarkTypes.TEXT: { await OpenAIQueue.enqueue( { bookmarkId: bookmark.id, type: "tag", }, enqueueOpts, ); break; } case BookmarkTypes.ASSET: { await AssetPreprocessingQueue.enqueue( { bookmarkId: bookmark.id, fixMode: false, }, enqueueOpts, ); break; } } await Promise.all([ triggerRuleEngineOnEvent( bookmark.id, [ { type: "bookmarkAdded", }, ], enqueueOpts, ), triggerSearchReindex(bookmark.id, enqueueOpts), triggerWebhook( bookmark.id, "created", /* userId */ undefined, enqueueOpts, ), ]); return bookmark; }), updateBookmark: authedProcedure .input(zUpdateBookmarksRequestSchema) .output(zBookmarkSchema) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { await ctx.db.transaction(async (tx) => { let somethingChanged = false; // Update link-specific fields if any are provided const linkUpdateData: Partial<{ url: string; description: string | null; author: string | null; publisher: string | null; datePublished: Date | null; dateModified: Date | null; }> = {}; if (input.url) { linkUpdateData.url = input.url.trim(); } if (input.description !== undefined) { linkUpdateData.description = input.description; } if (input.author !== undefined) { linkUpdateData.author = input.author; } if (input.publisher !== undefined) { linkUpdateData.publisher = input.publisher; } if (input.datePublished !== undefined) { linkUpdateData.datePublished = input.datePublished; } if (input.dateModified !== undefined) { linkUpdateData.dateModified = input.dateModified; } if (Object.keys(linkUpdateData).length > 0) { const result = await tx .update(bookmarkLinks) .set(linkUpdateData) .where(eq(bookmarkLinks.id, input.bookmarkId)); if (result.changes == 0) { throw new TRPCError({ code: "BAD_REQUEST", message: "Attempting to set link attributes for non-link type bookmark", }); } somethingChanged = true; } if (input.text) { const result = await tx .update(bookmarkTexts) .set({ text: input.text, }) .where(eq(bookmarkTexts.id, input.bookmarkId)); if (result.changes == 0) { throw new TRPCError({ code: "BAD_REQUEST", message: "Attempting to set link attributes for non-text type bookmark", }); } somethingChanged = true; } if (input.assetContent !== undefined) { const result = await tx .update(bookmarkAssets) .set({ content: input.assetContent, }) .where(and(eq(bookmarkAssets.id, input.bookmarkId))); if (result.changes == 0) { throw new TRPCError({ code: "BAD_REQUEST", message: "Attempting to set asset content for non-asset type bookmark", }); } somethingChanged = true; } // Update common bookmark fields const commonUpdateData: Partial<{ title: string | null; archived: boolean; favourited: boolean; note: string | null; summary: string | null; createdAt: Date; modifiedAt: Date; // Always update modifiedAt }> = { modifiedAt: new Date(), }; if (input.title !== undefined) { commonUpdateData.title = input.title; } if (input.archived !== undefined) { commonUpdateData.archived = input.archived; } if (input.favourited !== undefined) { commonUpdateData.favourited = input.favourited; } if (input.note !== undefined) { commonUpdateData.note = input.note; } if (input.summary !== undefined) { commonUpdateData.summary = input.summary; } if (input.createdAt !== undefined) { commonUpdateData.createdAt = input.createdAt; } if (Object.keys(commonUpdateData).length > 1 || somethingChanged) { await tx .update(bookmarks) .set(commonUpdateData) .where( and( eq(bookmarks.userId, ctx.user.id), eq(bookmarks.id, input.bookmarkId), ), ); } }); // Refetch the updated bookmark data to return the full object const updatedBookmark = ( await Bookmark.fromId( ctx, input.bookmarkId, /* includeContent: */ false, ) ).asZBookmark(); if (input.favourited === true || input.archived === true) { await triggerRuleEngineOnEvent( input.bookmarkId, [ ...(input.favourited === true ? ["favourited" as const] : []), ...(input.archived === true ? ["archived" as const] : []), ].map((t) => ({ type: t, })), ); } await Promise.all([ triggerSearchReindex(input.bookmarkId, { groupId: ctx.user.id, }), triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { groupId: ctx.user.id, }), ]); return updatedBookmark; }), // DEPRECATED: use updateBookmark instead updateBookmarkText: authedProcedure .input( z.object({ bookmarkId: z.string(), text: z.string(), }), ) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { await ctx.db.transaction(async (tx) => { const res = await tx .update(bookmarkTexts) .set({ text: input.text, }) .where(and(eq(bookmarkTexts.id, input.bookmarkId))) .returning(); if (res.length == 0) { throw new TRPCError({ code: "NOT_FOUND", message: "Bookmark not found", }); } await tx .update(bookmarks) .set({ modifiedAt: new Date() }) .where( and( eq(bookmarks.id, input.bookmarkId), eq(bookmarks.userId, ctx.user.id), ), ); }); await Promise.all([ triggerSearchReindex(input.bookmarkId, { groupId: ctx.user.id, }), triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { groupId: ctx.user.id, }), ]); }), deleteBookmark: authedProcedure .input(z.object({ bookmarkId: z.string() })) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { const bookmark = await Bookmark.fromId(ctx, input.bookmarkId, false); await bookmark.delete(); }), recrawlBookmark: authedProcedure .use( createRateLimitMiddleware({ name: "bookmarks.recrawlBookmark", windowMs: 30 * 60 * 1000, maxRequests: 200, }), ) .input( z.object({ bookmarkId: z.string(), archiveFullPage: z.boolean().optional().default(false), storePdf: z.boolean().optional().default(false), }), ) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { await LowPriorityCrawlerQueue.enqueue( { bookmarkId: input.bookmarkId, archiveFullPage: input.archiveFullPage, storePdf: input.storePdf, }, { groupId: ctx.user.id, priority: QueuePriority.Low, }, ); }), getBookmark: authedProcedure .input( z.object({ bookmarkId: z.string(), includeContent: z.boolean().optional().default(false), }), ) .output(zBookmarkSchema) .use(ensureBookmarkAccess) .query(async ({ input, ctx }) => { return ( await Bookmark.fromId(ctx, input.bookmarkId, input.includeContent) ).asZBookmark(); }), searchBookmarks: authedProcedure .input(zSearchBookmarksRequestSchema) .output( z.object({ bookmarks: z.array(zBookmarkSchema), nextCursor: zSearchBookmarksCursor.nullable(), }), ) .query(async ({ input, ctx }) => { if (!input.limit) { input.limit = DEFAULT_NUM_BOOKMARKS_PER_PAGE; } const sortOrder = input.sortOrder || "relevance"; const client = await getSearchClient(); if (!client) { throw new TRPCError({ code: "INTERNAL_SERVER_ERROR", message: "Search functionality is not configured", }); } const parsedQuery = parseSearchQuery(input.text); let filter: FilterQuery[]; if (parsedQuery.matcher) { const bookmarkIds = await getBookmarkIdsFromMatcher( ctx, parsedQuery.matcher, ); filter = [ { type: "in", field: "id", values: bookmarkIds }, { type: "eq", field: "userId", value: ctx.user.id }, ]; } else { filter = [{ type: "eq", field: "userId", value: ctx.user.id }]; } /** * preserve legacy behaviour */ const createdAtSortOrder = sortOrder === "relevance" ? "desc" : sortOrder; const resp = await client.search({ query: parsedQuery.text, filter, sort: [{ field: "createdAt", order: createdAtSortOrder }], limit: input.limit, ...(input.cursor ? { offset: input.cursor.offset, } : {}), }); if (resp.hits.length == 0) { return { bookmarks: [], nextCursor: null }; } const idToRank = resp.hits.reduce>((acc, r) => { acc[r.id] = r.score || 0; return acc; }, {}); const { bookmarks: results } = await Bookmark.loadMulti(ctx, { ids: resp.hits.map((h) => h.id), includeContent: input.includeContent, sortOrder: "desc", // Doesn't matter, we're sorting again afterwards and the list contain all data }); switch (true) { case sortOrder === "relevance": results.sort((a, b) => idToRank[b.id] - idToRank[a.id]); break; case sortOrder === "desc": results.sort((a, b) => b.createdAt.getTime() - a.createdAt.getTime()); break; case sortOrder === "asc": results.sort((a, b) => a.createdAt.getTime() - b.createdAt.getTime()); break; } return { bookmarks: results.map((b) => b.asZBookmark()), nextCursor: resp.hits.length + (input.cursor?.offset || 0) >= resp.totalHits ? null : { ver: 1 as const, offset: resp.hits.length + (input.cursor?.offset || 0), }, }; }), getBookmarks: authedProcedure .input(zGetBookmarksRequestSchema) .output(zGetBookmarksResponseSchema) .query(async ({ input, ctx }) => { const res = await Bookmark.loadMulti(ctx, input); return { bookmarks: res.bookmarks.map((b) => b.asZBookmark()), nextCursor: res.nextCursor, }; }), updateTags: authedProcedure .input( z.object({ bookmarkId: z.string(), attach: z.array(zManipulatedTagSchema), detach: z.array(zManipulatedTagSchema), }), ) .output( z.object({ attached: z.array(z.string()), detached: z.array(z.string()), }), ) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { // Helper function to fetch tag IDs and their names from a list of tag identifiers const fetchTagIdsWithNames = async ( tagIdentifiers: { tagId?: string; tagName?: string }[], ): Promise<{ id: string; name: string }[]> => { const tagIds = tagIdentifiers.flatMap((t) => t.tagId ? [t.tagId] : [], ); const tagNames = tagIdentifiers.flatMap((t) => t.tagName ? [t.tagName] : [], ); // Fetch tag IDs in parallel const [byIds, byNames] = await Promise.all([ tagIds.length > 0 ? ctx.db .select({ id: bookmarkTags.id, name: bookmarkTags.name }) .from(bookmarkTags) .where( and( eq(bookmarkTags.userId, ctx.user.id), inArray(bookmarkTags.id, tagIds), ), ) : Promise.resolve([]), tagNames.length > 0 ? ctx.db .select({ id: bookmarkTags.id, name: bookmarkTags.name }) .from(bookmarkTags) .where( and( eq(bookmarkTags.userId, ctx.user.id), inArray(bookmarkTags.name, tagNames), ), ) : Promise.resolve([]), ]); // Union results and deduplicate by tag ID const seen = new Set(); const results: { id: string; name: string }[] = []; for (const tag of [...byIds, ...byNames]) { if (!seen.has(tag.id)) { seen.add(tag.id); results.push({ id: tag.id, name: tag.name }); } } return results; }; // Normalize tag names and create new tags outside transaction to reduce transaction duration const normalizedAttachTags = input.attach.map((tag) => ({ tagId: tag.tagId, tagName: tag.tagName ? normalizeTagName(tag.tagName) : undefined, attachedBy: tag.attachedBy, })); { // Create new tags const toAddTagNames = normalizedAttachTags .flatMap((i) => (i.tagName ? [i.tagName] : [])) .filter((n) => n.length > 0); // drop empty results if (toAddTagNames.length > 0) { await ctx.db .insert(bookmarkTags) .values( toAddTagNames.map((name) => ({ name, userId: ctx.user.id })), ) .onConflictDoNothing(); } } // Fetch tag IDs for attachment/detachment now that we know that they all exist const [attachTagsWithNames, detachTagsWithNames] = await Promise.all([ fetchTagIdsWithNames(normalizedAttachTags), fetchTagIdsWithNames(input.detach), ]); // Build the attachedBy map from the fetched results const tagIdToAttachedBy = new Map(); for (const fetchedTag of attachTagsWithNames) { // Find the corresponding input tag const inputTag = normalizedAttachTags.find( (t) => (t.tagId && t.tagId === fetchedTag.id) || (t.tagName && t.tagName === fetchedTag.name), ); if (inputTag) { tagIdToAttachedBy.set(fetchedTag.id, inputTag.attachedBy); } } // Extract just the IDs for the transaction const allIdsToAttach = attachTagsWithNames.map((t) => t.id); const idsToRemove = detachTagsWithNames.map((t) => t.id); const res = await ctx.db.transaction(async (tx) => { // Detaches if (idsToRemove.length > 0) { await tx .delete(tagsOnBookmarks) .where( and( eq(tagsOnBookmarks.bookmarkId, input.bookmarkId), inArray(tagsOnBookmarks.tagId, idsToRemove), ), ); } // Attach tags if (allIdsToAttach.length > 0) { await tx .insert(tagsOnBookmarks) .values( allIdsToAttach.map((i) => ({ tagId: i, bookmarkId: input.bookmarkId, attachedBy: tagIdToAttachedBy.get(i) ?? "human", })), ) .onConflictDoNothing(); } // Update bookmark modified timestamp await tx .update(bookmarks) .set({ modifiedAt: new Date() }) .where( and( eq(bookmarks.id, input.bookmarkId), eq(bookmarks.userId, ctx.user.id), ), ); return { bookmarkId: input.bookmarkId, attached: allIdsToAttach, detached: idsToRemove, }; }); await Promise.allSettled([ triggerRuleEngineOnEvent(input.bookmarkId, [ ...res.detached.map((t) => ({ type: "tagRemoved" as const, tagId: t, })), ...res.attached.map((t) => ({ type: "tagAdded" as const, tagId: t, })), ]), triggerSearchReindex(input.bookmarkId, { groupId: ctx.user.id, }), triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { groupId: ctx.user.id, }), ]); return res; }), getBrokenLinks: authedProcedure .output( z.object({ bookmarks: z.array( z.object({ id: z.string(), url: z.string(), statusCode: z.number().nullable(), isCrawlingFailure: z.boolean(), crawledAt: z.date().nullable(), createdAt: z.date().nullable(), }), ), }), ) .query(async ({ ctx }) => { const brokenLinkBookmarks = await ctx.db .select({ id: bookmarkLinks.id, url: bookmarkLinks.url, crawlStatusCode: bookmarkLinks.crawlStatusCode, crawlingStatus: bookmarkLinks.crawlStatus, crawledAt: bookmarkLinks.crawledAt, createdAt: bookmarks.createdAt, }) .from(bookmarkLinks) .leftJoin(bookmarks, eq(bookmarks.id, bookmarkLinks.id)) .where( and( eq(bookmarks.userId, ctx.user.id), or( eq(bookmarkLinks.crawlStatus, "failure"), lt(bookmarkLinks.crawlStatusCode, 200), gt(bookmarkLinks.crawlStatusCode, 299), ), ), ); return { bookmarks: brokenLinkBookmarks.map((b) => ({ id: b.id, url: b.url, statusCode: b.crawlStatusCode, isCrawlingFailure: b.crawlingStatus === "failure", crawledAt: b.crawledAt, createdAt: b.createdAt, })), }; }), summarizeBookmark: authedProcedure .use( createRateLimitMiddleware({ name: "bookmarks.summarizeBookmark", windowMs: 30 * 60 * 1000, maxRequests: 100, }), ) .input( z.object({ bookmarkId: z.string(), }), ) .output( z.object({ summary: z.string(), }), ) .use(ensureBookmarkOwnership) .mutation(async ({ input, ctx }) => { const inferenceClient = InferenceClientFactory.build(); if (!inferenceClient) { throw new TRPCError({ code: "BAD_REQUEST", message: "No inference client configured", }); } const bookmark = await ctx.db.query.bookmarkLinks.findFirst({ where: eq(bookmarkLinks.id, input.bookmarkId), }); if (!bookmark) { throw new TRPCError({ code: "NOT_FOUND", message: "Bookmark not found or not a link", }); } const content = await Bookmark.getBookmarkPlainTextContent( bookmark, ctx.user.id, ); const bookmarkDetails = ` Title: ${bookmark.title ?? ""} Description: ${bookmark.description ?? ""} Content: ${content} Publisher: ${bookmark.publisher ?? ""} Author: ${bookmark.author ?? ""} `; const prompts = await ctx.db.query.customPrompts.findMany({ where: and( eq(customPrompts.userId, ctx.user.id), eq(customPrompts.appliesTo, "summary"), ), columns: { text: true, }, }); const userSettings = await ctx.db.query.users.findFirst({ where: eq(users.id, ctx.user.id), columns: { inferredTagLang: true, }, }); const summaryPrompt = await buildSummaryPrompt( userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang, prompts.map((p) => p.text), bookmarkDetails, serverConfig.inference.contextLength, ); const summary = await inferenceClient.inferFromText(summaryPrompt, { schema: null, }); if (!summary.response) { throw new TRPCError({ code: "INTERNAL_SERVER_ERROR", message: "Failed to summarize bookmark", }); } await ctx.db .update(bookmarks) .set({ summary: summary.response, }) .where(eq(bookmarks.id, input.bookmarkId)); await Promise.all([ triggerSearchReindex(input.bookmarkId, { groupId: ctx.user.id, }), triggerWebhook(input.bookmarkId, "edited", ctx.user.id, { groupId: ctx.user.id, }), ]); return { bookmarkId: input.bookmarkId, summary: summary.response, }; }), });