From 3c838ddb26c1e86d3f201ce71f13c834be705f69 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Wed, 4 Feb 2026 09:44:18 +0000 Subject: feat: Import workflow v3 (#2378) * feat: import workflow v3 * batch stage * revert migration * cleanups * pr comments * move to models * add allowed workers * e2e tests * import list ids * add missing indicies * merge test * more fixes * add resume/pause to UI * fix ui states * fix tests * simplify progress tracking * remove backpressure * fix list imports * fix race on claiming bookmarks * remove the codex file --- packages/trpc/models/importSessions.ts | 174 ++++++++++++-------- packages/trpc/routers/bookmarks.ts | 13 -- packages/trpc/routers/importSessions.test.ts | 233 +++++++++++++++++++++++---- packages/trpc/routers/importSessions.ts | 109 +++++++++++++ 4 files changed, 420 insertions(+), 109 deletions(-) (limited to 'packages/trpc') diff --git a/packages/trpc/models/importSessions.ts b/packages/trpc/models/importSessions.ts index c324cf7f..ee0eb5b2 100644 --- a/packages/trpc/models/importSessions.ts +++ b/packages/trpc/models/importSessions.ts @@ -2,12 +2,7 @@ import { TRPCError } from "@trpc/server"; import { and, count, eq } from "drizzle-orm"; import { z } from "zod"; -import { - bookmarkLinks, - bookmarks, - importSessionBookmarks, - importSessions, -} from "@karakeep/db/schema"; +import { importSessions, importStagingBookmarks } from "@karakeep/db/schema"; import { zCreateImportSessionRequestSchema, ZImportSession, @@ -81,38 +76,17 @@ export class ImportSession { ); } - async attachBookmark(bookmarkId: string): Promise { - await this.ctx.db.insert(importSessionBookmarks).values({ - importSessionId: this.session.id, - bookmarkId, - }); - } - async getWithStats(): Promise { - // Get bookmark counts by status + // Count by staging status - this now reflects the true state since + // items stay in "processing" until downstream crawl/tag is complete const statusCounts = await this.ctx.db .select({ - crawlStatus: bookmarkLinks.crawlStatus, - taggingStatus: bookmarks.taggingStatus, + status: importStagingBookmarks.status, count: count(), }) - .from(importSessionBookmarks) - .innerJoin( - importSessions, - eq(importSessions.id, importSessionBookmarks.importSessionId), - ) - .leftJoin(bookmarks, eq(bookmarks.id, importSessionBookmarks.bookmarkId)) - .leftJoin( - bookmarkLinks, - eq(bookmarkLinks.id, importSessionBookmarks.bookmarkId), - ) - .where( - and( - eq(importSessionBookmarks.importSessionId, this.session.id), - eq(importSessions.userId, this.ctx.user.id), - ), - ) - .groupBy(bookmarkLinks.crawlStatus, bookmarks.taggingStatus); + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.importSessionId, this.session.id)) + .groupBy(importStagingBookmarks.status); const stats = { totalBookmarks: 0, @@ -122,41 +96,27 @@ export class ImportSession { processingBookmarks: 0, }; - statusCounts.forEach((statusCount) => { - const { crawlStatus, taggingStatus, count } = statusCount; - - stats.totalBookmarks += count; - - const isCrawlFailure = crawlStatus === "failure"; - const isTagFailure = taggingStatus === "failure"; - if (isCrawlFailure || isTagFailure) { - stats.failedBookmarks += count; - return; - } - - const isCrawlPending = crawlStatus === "pending"; - const isTagPending = taggingStatus === "pending"; - if (isCrawlPending || isTagPending) { - stats.pendingBookmarks += count; - return; - } - - const isCrawlSuccessfulOrNotRequired = - crawlStatus === "success" || crawlStatus === null; - const isTagSuccessfulOrUnknown = - taggingStatus === "success" || taggingStatus === null; - - if (isCrawlSuccessfulOrNotRequired && isTagSuccessfulOrUnknown) { - stats.completedBookmarks += count; - } else { - // Fallback to pending to avoid leaving imports unclassified - stats.pendingBookmarks += count; + statusCounts.forEach(({ status, count: itemCount }) => { + stats.totalBookmarks += itemCount; + + switch (status) { + case "pending": + stats.pendingBookmarks += itemCount; + break; + case "processing": + stats.processingBookmarks += itemCount; + break; + case "completed": + stats.completedBookmarks += itemCount; + break; + case "failed": + stats.failedBookmarks += itemCount; + break; } }); return { ...this.session, - status: stats.pendingBookmarks > 0 ? "in_progress" : "completed", ...stats, }; } @@ -179,4 +139,92 @@ export class ImportSession { }); } } + + async stageBookmarks( + bookmarks: { + type: "link" | "text" | "asset"; + url?: string; + title?: string; + content?: string; + note?: string; + tags: string[]; + listIds: string[]; + sourceAddedAt?: Date; + }[], + ): Promise { + if (this.session.status !== "staging") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not in staging status", + }); + } + + // Filter out invalid bookmarks (link without url, text without content) + const validBookmarks = bookmarks.filter((bookmark) => { + if (bookmark.type === "link" && !bookmark.url) return false; + if (bookmark.type === "text" && !bookmark.content) return false; + return true; + }); + + if (validBookmarks.length === 0) { + return; + } + + await this.ctx.db.insert(importStagingBookmarks).values( + validBookmarks.map((bookmark) => ({ + importSessionId: this.session.id, + type: bookmark.type, + url: bookmark.url, + title: bookmark.title, + content: bookmark.content, + note: bookmark.note, + tags: bookmark.tags, + listIds: bookmark.listIds, + sourceAddedAt: bookmark.sourceAddedAt, + status: "pending" as const, + })), + ); + } + + async finalize(): Promise { + if (this.session.status !== "staging") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not in staging status", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "pending" }) + .where(eq(importSessions.id, this.session.id)); + } + + async pause(): Promise { + if (!["pending", "running"].includes(this.session.status)) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session cannot be paused in current status", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "paused" }) + .where(eq(importSessions.id, this.session.id)); + } + + async resume(): Promise { + if (this.session.status !== "paused") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not paused", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "pending" }) + .where(eq(importSessions.id, this.session.id)); + } } diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 882ff9b1..59c93581 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -51,7 +51,6 @@ import { authedProcedure, createRateLimitMiddleware, router } from "../index"; import { getBookmarkIdsFromMatcher } from "../lib/search"; import { Asset } from "../models/assets"; import { BareBookmark, Bookmark } from "../models/bookmarks"; -import { ImportSession } from "../models/importSessions"; export const ensureBookmarkOwnership = experimental_trpcMiddleware<{ ctx: AuthedContext; @@ -121,13 +120,6 @@ export const bookmarksAppRouter = router({ // This doesn't 100% protect from duplicates because of races, but it's more than enough for this usecase. const alreadyExists = await attemptToDedupLink(ctx, input.url); if (alreadyExists) { - if (input.importSessionId) { - const session = await ImportSession.fromId( - ctx, - input.importSessionId, - ); - await session.attachBookmark(alreadyExists.id); - } return { ...alreadyExists, alreadyExists: true }; } } @@ -277,11 +269,6 @@ export const bookmarksAppRouter = router({ }, ); - if (input.importSessionId) { - const session = await ImportSession.fromId(ctx, input.importSessionId); - await session.attachBookmark(bookmark.id); - } - const enqueueOpts: EnqueueOptions = { // The lower the priority number, the sooner the job will be processed priority: input.crawlPriority === "low" ? 50 : 0, diff --git a/packages/trpc/routers/importSessions.test.ts b/packages/trpc/routers/importSessions.test.ts index 9ef0de6f..f257ad3b 100644 --- a/packages/trpc/routers/importSessions.test.ts +++ b/packages/trpc/routers/importSessions.test.ts @@ -1,12 +1,13 @@ -import { eq } from "drizzle-orm"; import { beforeEach, describe, expect, test } from "vitest"; import { z } from "zod"; -import { bookmarks } from "@karakeep/db/schema"; import { - BookmarkTypes, - zNewBookmarkRequestSchema, -} from "@karakeep/shared/types/bookmarks"; + bookmarkLinks, + bookmarks, + bookmarkTexts, + importStagingBookmarks, +} from "@karakeep/db/schema"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import { zCreateImportSessionRequestSchema, zDeleteImportSessionRequestSchema, @@ -20,17 +21,6 @@ import { defaultBeforeEach } from "../testUtils"; beforeEach(defaultBeforeEach(true)); describe("ImportSessions Routes", () => { - async function createTestBookmark(api: APICallerType, sessionId: string) { - const newBookmarkInput: z.infer = { - type: BookmarkTypes.TEXT, - text: "Test bookmark text", - importSessionId: sessionId, - }; - const createdBookmark = - await api.bookmarks.createBookmark(newBookmarkInput); - return createdBookmark.id; - } - async function createTestList(api: APICallerType) { const newListInput: z.infer = { name: "Test Import List", @@ -98,8 +88,15 @@ describe("ImportSessions Routes", () => { const session = await api.importSessions.createImportSession({ name: "Test Import Session", }); - await createTestBookmark(api, session.id); - await createTestBookmark(api, session.id); + + // Stage bookmarks using the staging flow + await api.importSessions.stageImportedBookmarks({ + importSessionId: session.id, + bookmarks: [ + { type: "text", content: "Test bookmark 1", tags: [], listIds: [] }, + { type: "text", content: "Test bookmark 2", tags: [], listIds: [] }, + ], + }); const statsInput: z.infer = { importSessionId: session.id, @@ -110,7 +107,7 @@ describe("ImportSessions Routes", () => { expect(stats).toMatchObject({ id: session.id, name: "Test Import Session", - status: "in_progress", + status: "staging", totalBookmarks: 2, pendingBookmarks: 2, completedBookmarks: 0, @@ -119,31 +116,191 @@ describe("ImportSessions Routes", () => { }); }); - test("marks text-only imports as completed when tagging succeeds", async ({ + test("stats reflect crawl and tagging status for completed staging bookmarks", async ({ apiCallers, db, }) => { const api = apiCallers[0]; + const session = await api.importSessions.createImportSession({ - name: "Text Import Session", + name: "Test Import Session", + }); + + // Create bookmarks with different crawl/tag statuses + const user = (await db.query.users.findFirst())!; + + // 1. Link bookmark: crawl success, tag success -> completed + const [completedLinkBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.LINK, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkLinks).values({ + id: completedLinkBookmark.id, + url: "https://example.com/1", + crawlStatus: "success", + }); + + // 2. Link bookmark: crawl pending, tag success -> processing + const [crawlPendingBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.LINK, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkLinks).values({ + id: crawlPendingBookmark.id, + url: "https://example.com/2", + crawlStatus: "pending", + }); + + // 3. Text bookmark: tag pending -> processing + const [tagPendingBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.TEXT, + taggingStatus: "pending", + }) + .returning(); + await db.insert(bookmarkTexts).values({ + id: tagPendingBookmark.id, + text: "Test text", + }); + + // 4. Link bookmark: crawl failure -> failed + const [crawlFailedBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.LINK, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkLinks).values({ + id: crawlFailedBookmark.id, + url: "https://example.com/3", + crawlStatus: "failure", }); - const bookmarkId = await createTestBookmark(api, session.id); - await db - .update(bookmarks) - .set({ taggingStatus: "success" }) - .where(eq(bookmarks.id, bookmarkId)); + // 5. Text bookmark: tag failure -> failed + const [tagFailedBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.TEXT, + taggingStatus: "failure", + }) + .returning(); + await db.insert(bookmarkTexts).values({ + id: tagFailedBookmark.id, + text: "Test text 2", + }); + + // 6. Text bookmark: tag success (no crawl needed) -> completed + const [completedTextBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.TEXT, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkTexts).values({ + id: completedTextBookmark.id, + text: "Test text 3", + }); + + // Create staging bookmarks in different states + // Note: With the new import worker design, items stay in "processing" until + // crawl/tag is done. Only then do they move to "completed". + await db.insert(importStagingBookmarks).values([ + // Staging pending -> pendingBookmarks + { + importSessionId: session.id, + type: "text", + content: "pending staging", + status: "pending", + }, + // Staging processing (no bookmark yet) -> processingBookmarks + { + importSessionId: session.id, + type: "text", + content: "processing staging", + status: "processing", + }, + // Staging failed -> failedBookmarks + { + importSessionId: session.id, + type: "text", + content: "failed staging", + status: "failed", + }, + // Staging completed + crawl/tag success -> completedBookmarks + { + importSessionId: session.id, + type: "link", + url: "https://example.com/1", + status: "completed", + resultBookmarkId: completedLinkBookmark.id, + }, + // Staging processing + crawl pending -> processingBookmarks (waiting for crawl) + { + importSessionId: session.id, + type: "link", + url: "https://example.com/2", + status: "processing", + resultBookmarkId: crawlPendingBookmark.id, + }, + // Staging processing + tag pending -> processingBookmarks (waiting for tag) + { + importSessionId: session.id, + type: "text", + content: "tag pending", + status: "processing", + resultBookmarkId: tagPendingBookmark.id, + }, + // Staging completed + crawl failure -> completedBookmarks (failure is terminal) + { + importSessionId: session.id, + type: "link", + url: "https://example.com/3", + status: "completed", + resultBookmarkId: crawlFailedBookmark.id, + }, + // Staging completed + tag failure -> completedBookmarks (failure is terminal) + { + importSessionId: session.id, + type: "text", + content: "tag failed", + status: "completed", + resultBookmarkId: tagFailedBookmark.id, + }, + // Staging completed + tag success (text, no crawl) -> completedBookmarks + { + importSessionId: session.id, + type: "text", + content: "completed text", + status: "completed", + resultBookmarkId: completedTextBookmark.id, + }, + ]); const stats = await api.importSessions.getImportSessionStats({ importSessionId: session.id, }); expect(stats).toMatchObject({ - completedBookmarks: 1, - pendingBookmarks: 0, - failedBookmarks: 0, - totalBookmarks: 1, - status: "completed", + totalBookmarks: 9, + pendingBookmarks: 1, // staging pending + processingBookmarks: 3, // staging processing (no bookmark) + crawl pending + tag pending + completedBookmarks: 4, // link success + text success + crawl failure + tag failure + failedBookmarks: 1, // staging failed }); }); @@ -215,7 +372,7 @@ describe("ImportSessions Routes", () => { ).rejects.toThrow("Import session not found"); }); - test("cannot attach other user's bookmark", async ({ + test("cannot stage other user's session", async ({ apiCallers, }) => { const api1 = apiCallers[0]; @@ -228,7 +385,17 @@ describe("ImportSessions Routes", () => { // User 1 tries to attach User 2's bookmark await expect( - createTestBookmark(api2, session.id), // User 2's bookmark + api2.importSessions.stageImportedBookmarks({ + importSessionId: session.id, + bookmarks: [ + { + type: "text", + content: "Test bookmark", + tags: [], + listIds: [], + }, + ], + }), ).rejects.toThrow("Import session not found"); }); }); diff --git a/packages/trpc/routers/importSessions.ts b/packages/trpc/routers/importSessions.ts index 4bdc4f29..62263bdd 100644 --- a/packages/trpc/routers/importSessions.ts +++ b/packages/trpc/routers/importSessions.ts @@ -1,5 +1,8 @@ +import { experimental_trpcMiddleware } from "@trpc/server"; +import { and, eq, gt } from "drizzle-orm"; import { z } from "zod"; +import { importStagingBookmarks } from "@karakeep/db/schema"; import { zCreateImportSessionRequestSchema, zDeleteImportSessionRequestSchema, @@ -9,9 +12,26 @@ import { zListImportSessionsResponseSchema, } from "@karakeep/shared/types/importSessions"; +import type { AuthedContext } from "../index"; import { authedProcedure, router } from "../index"; import { ImportSession } from "../models/importSessions"; +const ensureImportSessionAccess = experimental_trpcMiddleware<{ + ctx: AuthedContext; + input: { importSessionId: string }; +}>().create(async (opts) => { + const importSession = await ImportSession.fromId( + opts.ctx, + opts.input.importSessionId, + ); + return opts.next({ + ctx: { + ...opts.ctx, + importSession, + }, + }); +}); + export const importSessionsRouter = router({ createImportSession: authedProcedure .input(zCreateImportSessionRequestSchema) @@ -45,4 +65,93 @@ export const importSessionsRouter = router({ await session.delete(); return { success: true }; }), + + stageImportedBookmarks: authedProcedure + .input( + z.object({ + importSessionId: z.string(), + bookmarks: z + .array( + z.object({ + type: z.enum(["link", "text", "asset"]), + url: z.string().optional(), + title: z.string().optional(), + content: z.string().optional(), + note: z.string().optional(), + tags: z.array(z.string()).default([]), + listIds: z.array(z.string()).default([]), + sourceAddedAt: z.date().optional(), + }), + ) + .max(50), + }), + ) + .use(ensureImportSessionAccess) + .mutation(async ({ input, ctx }) => { + await ctx.importSession.stageBookmarks(input.bookmarks); + }), + + finalizeImportStaging: authedProcedure + .input(z.object({ importSessionId: z.string() })) + .use(ensureImportSessionAccess) + .mutation(async ({ ctx }) => { + await ctx.importSession.finalize(); + }), + + pauseImportSession: authedProcedure + .input(z.object({ importSessionId: z.string() })) + .use(ensureImportSessionAccess) + .mutation(async ({ ctx }) => { + await ctx.importSession.pause(); + }), + + resumeImportSession: authedProcedure + .input(z.object({ importSessionId: z.string() })) + .use(ensureImportSessionAccess) + .mutation(async ({ ctx }) => { + await ctx.importSession.resume(); + }), + + getImportSessionResults: authedProcedure + .input( + z.object({ + importSessionId: z.string(), + filter: z + .enum(["all", "accepted", "rejected", "skipped_duplicate", "pending"]) + .optional(), + cursor: z.string().optional(), + limit: z.number().default(50), + }), + ) + .use(ensureImportSessionAccess) + .query(async ({ ctx, input }) => { + const results = await ctx.db + .select() + .from(importStagingBookmarks) + .where( + and( + eq( + importStagingBookmarks.importSessionId, + ctx.importSession.session.id, + ), + input.filter && input.filter !== "all" + ? input.filter === "pending" + ? eq(importStagingBookmarks.status, "pending") + : eq(importStagingBookmarks.result, input.filter) + : undefined, + input.cursor + ? gt(importStagingBookmarks.id, input.cursor) + : undefined, + ), + ) + .orderBy(importStagingBookmarks.id) + .limit(input.limit + 1); + + // Return with pagination info + const hasMore = results.length > input.limit; + return { + items: results.slice(0, input.limit), + nextCursor: hasMore ? results[input.limit - 1].id : null, + }; + }), }); -- cgit v1.2.3-70-g09d2