diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-04 09:44:18 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-04 09:44:18 +0000 |
| commit | 3c838ddb26c1e86d3f201ce71f13c834be705f69 (patch) | |
| tree | 892fe4f8cd2ca01d6e4cd34f677fc16aa2fd63f6 /packages/trpc | |
| parent | 3fcccb858ee3ef22fe9ce479af4ce458ac9a0fe1 (diff) | |
| download | karakeep-3c838ddb26c1e86d3f201ce71f13c834be705f69.tar.zst | |
feat: Import workflow v3 (#2378)
* feat: import workflow v3
* batch stage
* revert migration
* cleanups
* pr comments
* move to models
* add allowed workers
* e2e tests
* import list ids
* add missing indicies
* merge test
* more fixes
* add resume/pause to UI
* fix ui states
* fix tests
* simplify progress tracking
* remove backpressure
* fix list imports
* fix race on claiming bookmarks
* remove the codex file
Diffstat (limited to 'packages/trpc')
| -rw-r--r-- | packages/trpc/models/importSessions.ts | 174 | ||||
| -rw-r--r-- | packages/trpc/routers/bookmarks.ts | 13 | ||||
| -rw-r--r-- | packages/trpc/routers/importSessions.test.ts | 233 | ||||
| -rw-r--r-- | packages/trpc/routers/importSessions.ts | 109 |
4 files changed, 420 insertions, 109 deletions
diff --git a/packages/trpc/models/importSessions.ts b/packages/trpc/models/importSessions.ts index c324cf7f..ee0eb5b2 100644 --- a/packages/trpc/models/importSessions.ts +++ b/packages/trpc/models/importSessions.ts @@ -2,12 +2,7 @@ import { TRPCError } from "@trpc/server"; import { and, count, eq } from "drizzle-orm"; import { z } from "zod"; -import { - bookmarkLinks, - bookmarks, - importSessionBookmarks, - importSessions, -} from "@karakeep/db/schema"; +import { importSessions, importStagingBookmarks } from "@karakeep/db/schema"; import { zCreateImportSessionRequestSchema, ZImportSession, @@ -81,38 +76,17 @@ export class ImportSession { ); } - async attachBookmark(bookmarkId: string): Promise<void> { - await this.ctx.db.insert(importSessionBookmarks).values({ - importSessionId: this.session.id, - bookmarkId, - }); - } - async getWithStats(): Promise<ZImportSessionWithStats> { - // Get bookmark counts by status + // Count by staging status - this now reflects the true state since + // items stay in "processing" until downstream crawl/tag is complete const statusCounts = await this.ctx.db .select({ - crawlStatus: bookmarkLinks.crawlStatus, - taggingStatus: bookmarks.taggingStatus, + status: importStagingBookmarks.status, count: count(), }) - .from(importSessionBookmarks) - .innerJoin( - importSessions, - eq(importSessions.id, importSessionBookmarks.importSessionId), - ) - .leftJoin(bookmarks, eq(bookmarks.id, importSessionBookmarks.bookmarkId)) - .leftJoin( - bookmarkLinks, - eq(bookmarkLinks.id, importSessionBookmarks.bookmarkId), - ) - .where( - and( - eq(importSessionBookmarks.importSessionId, this.session.id), - eq(importSessions.userId, this.ctx.user.id), - ), - ) - .groupBy(bookmarkLinks.crawlStatus, bookmarks.taggingStatus); + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.importSessionId, this.session.id)) + .groupBy(importStagingBookmarks.status); const stats = { totalBookmarks: 0, @@ -122,41 +96,27 @@ export class ImportSession { processingBookmarks: 0, }; - statusCounts.forEach((statusCount) => { - const { crawlStatus, taggingStatus, count } = statusCount; - - stats.totalBookmarks += count; - - const isCrawlFailure = crawlStatus === "failure"; - const isTagFailure = taggingStatus === "failure"; - if (isCrawlFailure || isTagFailure) { - stats.failedBookmarks += count; - return; - } - - const isCrawlPending = crawlStatus === "pending"; - const isTagPending = taggingStatus === "pending"; - if (isCrawlPending || isTagPending) { - stats.pendingBookmarks += count; - return; - } - - const isCrawlSuccessfulOrNotRequired = - crawlStatus === "success" || crawlStatus === null; - const isTagSuccessfulOrUnknown = - taggingStatus === "success" || taggingStatus === null; - - if (isCrawlSuccessfulOrNotRequired && isTagSuccessfulOrUnknown) { - stats.completedBookmarks += count; - } else { - // Fallback to pending to avoid leaving imports unclassified - stats.pendingBookmarks += count; + statusCounts.forEach(({ status, count: itemCount }) => { + stats.totalBookmarks += itemCount; + + switch (status) { + case "pending": + stats.pendingBookmarks += itemCount; + break; + case "processing": + stats.processingBookmarks += itemCount; + break; + case "completed": + stats.completedBookmarks += itemCount; + break; + case "failed": + stats.failedBookmarks += itemCount; + break; } }); return { ...this.session, - status: stats.pendingBookmarks > 0 ? "in_progress" : "completed", ...stats, }; } @@ -179,4 +139,92 @@ export class ImportSession { }); } } + + async stageBookmarks( + bookmarks: { + type: "link" | "text" | "asset"; + url?: string; + title?: string; + content?: string; + note?: string; + tags: string[]; + listIds: string[]; + sourceAddedAt?: Date; + }[], + ): Promise<void> { + if (this.session.status !== "staging") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not in staging status", + }); + } + + // Filter out invalid bookmarks (link without url, text without content) + const validBookmarks = bookmarks.filter((bookmark) => { + if (bookmark.type === "link" && !bookmark.url) return false; + if (bookmark.type === "text" && !bookmark.content) return false; + return true; + }); + + if (validBookmarks.length === 0) { + return; + } + + await this.ctx.db.insert(importStagingBookmarks).values( + validBookmarks.map((bookmark) => ({ + importSessionId: this.session.id, + type: bookmark.type, + url: bookmark.url, + title: bookmark.title, + content: bookmark.content, + note: bookmark.note, + tags: bookmark.tags, + listIds: bookmark.listIds, + sourceAddedAt: bookmark.sourceAddedAt, + status: "pending" as const, + })), + ); + } + + async finalize(): Promise<void> { + if (this.session.status !== "staging") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not in staging status", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "pending" }) + .where(eq(importSessions.id, this.session.id)); + } + + async pause(): Promise<void> { + if (!["pending", "running"].includes(this.session.status)) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session cannot be paused in current status", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "paused" }) + .where(eq(importSessions.id, this.session.id)); + } + + async resume(): Promise<void> { + if (this.session.status !== "paused") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not paused", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "pending" }) + .where(eq(importSessions.id, this.session.id)); + } } diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts index 882ff9b1..59c93581 100644 --- a/packages/trpc/routers/bookmarks.ts +++ b/packages/trpc/routers/bookmarks.ts @@ -51,7 +51,6 @@ import { authedProcedure, createRateLimitMiddleware, router } from "../index"; import { getBookmarkIdsFromMatcher } from "../lib/search"; import { Asset } from "../models/assets"; import { BareBookmark, Bookmark } from "../models/bookmarks"; -import { ImportSession } from "../models/importSessions"; export const ensureBookmarkOwnership = experimental_trpcMiddleware<{ ctx: AuthedContext; @@ -121,13 +120,6 @@ export const bookmarksAppRouter = router({ // This doesn't 100% protect from duplicates because of races, but it's more than enough for this usecase. const alreadyExists = await attemptToDedupLink(ctx, input.url); if (alreadyExists) { - if (input.importSessionId) { - const session = await ImportSession.fromId( - ctx, - input.importSessionId, - ); - await session.attachBookmark(alreadyExists.id); - } return { ...alreadyExists, alreadyExists: true }; } } @@ -277,11 +269,6 @@ export const bookmarksAppRouter = router({ }, ); - if (input.importSessionId) { - const session = await ImportSession.fromId(ctx, input.importSessionId); - await session.attachBookmark(bookmark.id); - } - const enqueueOpts: EnqueueOptions = { // The lower the priority number, the sooner the job will be processed priority: input.crawlPriority === "low" ? 50 : 0, diff --git a/packages/trpc/routers/importSessions.test.ts b/packages/trpc/routers/importSessions.test.ts index 9ef0de6f..f257ad3b 100644 --- a/packages/trpc/routers/importSessions.test.ts +++ b/packages/trpc/routers/importSessions.test.ts @@ -1,12 +1,13 @@ -import { eq } from "drizzle-orm"; import { beforeEach, describe, expect, test } from "vitest"; import { z } from "zod"; -import { bookmarks } from "@karakeep/db/schema"; import { - BookmarkTypes, - zNewBookmarkRequestSchema, -} from "@karakeep/shared/types/bookmarks"; + bookmarkLinks, + bookmarks, + bookmarkTexts, + importStagingBookmarks, +} from "@karakeep/db/schema"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import { zCreateImportSessionRequestSchema, zDeleteImportSessionRequestSchema, @@ -20,17 +21,6 @@ import { defaultBeforeEach } from "../testUtils"; beforeEach<CustomTestContext>(defaultBeforeEach(true)); describe("ImportSessions Routes", () => { - async function createTestBookmark(api: APICallerType, sessionId: string) { - const newBookmarkInput: z.infer<typeof zNewBookmarkRequestSchema> = { - type: BookmarkTypes.TEXT, - text: "Test bookmark text", - importSessionId: sessionId, - }; - const createdBookmark = - await api.bookmarks.createBookmark(newBookmarkInput); - return createdBookmark.id; - } - async function createTestList(api: APICallerType) { const newListInput: z.infer<typeof zNewBookmarkListSchema> = { name: "Test Import List", @@ -98,8 +88,15 @@ describe("ImportSessions Routes", () => { const session = await api.importSessions.createImportSession({ name: "Test Import Session", }); - await createTestBookmark(api, session.id); - await createTestBookmark(api, session.id); + + // Stage bookmarks using the staging flow + await api.importSessions.stageImportedBookmarks({ + importSessionId: session.id, + bookmarks: [ + { type: "text", content: "Test bookmark 1", tags: [], listIds: [] }, + { type: "text", content: "Test bookmark 2", tags: [], listIds: [] }, + ], + }); const statsInput: z.infer<typeof zGetImportSessionStatsRequestSchema> = { importSessionId: session.id, @@ -110,7 +107,7 @@ describe("ImportSessions Routes", () => { expect(stats).toMatchObject({ id: session.id, name: "Test Import Session", - status: "in_progress", + status: "staging", totalBookmarks: 2, pendingBookmarks: 2, completedBookmarks: 0, @@ -119,31 +116,191 @@ describe("ImportSessions Routes", () => { }); }); - test<CustomTestContext>("marks text-only imports as completed when tagging succeeds", async ({ + test<CustomTestContext>("stats reflect crawl and tagging status for completed staging bookmarks", async ({ apiCallers, db, }) => { const api = apiCallers[0]; + const session = await api.importSessions.createImportSession({ - name: "Text Import Session", + name: "Test Import Session", + }); + + // Create bookmarks with different crawl/tag statuses + const user = (await db.query.users.findFirst())!; + + // 1. Link bookmark: crawl success, tag success -> completed + const [completedLinkBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.LINK, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkLinks).values({ + id: completedLinkBookmark.id, + url: "https://example.com/1", + crawlStatus: "success", + }); + + // 2. Link bookmark: crawl pending, tag success -> processing + const [crawlPendingBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.LINK, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkLinks).values({ + id: crawlPendingBookmark.id, + url: "https://example.com/2", + crawlStatus: "pending", + }); + + // 3. Text bookmark: tag pending -> processing + const [tagPendingBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.TEXT, + taggingStatus: "pending", + }) + .returning(); + await db.insert(bookmarkTexts).values({ + id: tagPendingBookmark.id, + text: "Test text", + }); + + // 4. Link bookmark: crawl failure -> failed + const [crawlFailedBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.LINK, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkLinks).values({ + id: crawlFailedBookmark.id, + url: "https://example.com/3", + crawlStatus: "failure", }); - const bookmarkId = await createTestBookmark(api, session.id); - await db - .update(bookmarks) - .set({ taggingStatus: "success" }) - .where(eq(bookmarks.id, bookmarkId)); + // 5. Text bookmark: tag failure -> failed + const [tagFailedBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.TEXT, + taggingStatus: "failure", + }) + .returning(); + await db.insert(bookmarkTexts).values({ + id: tagFailedBookmark.id, + text: "Test text 2", + }); + + // 6. Text bookmark: tag success (no crawl needed) -> completed + const [completedTextBookmark] = await db + .insert(bookmarks) + .values({ + userId: user.id, + type: BookmarkTypes.TEXT, + taggingStatus: "success", + }) + .returning(); + await db.insert(bookmarkTexts).values({ + id: completedTextBookmark.id, + text: "Test text 3", + }); + + // Create staging bookmarks in different states + // Note: With the new import worker design, items stay in "processing" until + // crawl/tag is done. Only then do they move to "completed". + await db.insert(importStagingBookmarks).values([ + // Staging pending -> pendingBookmarks + { + importSessionId: session.id, + type: "text", + content: "pending staging", + status: "pending", + }, + // Staging processing (no bookmark yet) -> processingBookmarks + { + importSessionId: session.id, + type: "text", + content: "processing staging", + status: "processing", + }, + // Staging failed -> failedBookmarks + { + importSessionId: session.id, + type: "text", + content: "failed staging", + status: "failed", + }, + // Staging completed + crawl/tag success -> completedBookmarks + { + importSessionId: session.id, + type: "link", + url: "https://example.com/1", + status: "completed", + resultBookmarkId: completedLinkBookmark.id, + }, + // Staging processing + crawl pending -> processingBookmarks (waiting for crawl) + { + importSessionId: session.id, + type: "link", + url: "https://example.com/2", + status: "processing", + resultBookmarkId: crawlPendingBookmark.id, + }, + // Staging processing + tag pending -> processingBookmarks (waiting for tag) + { + importSessionId: session.id, + type: "text", + content: "tag pending", + status: "processing", + resultBookmarkId: tagPendingBookmark.id, + }, + // Staging completed + crawl failure -> completedBookmarks (failure is terminal) + { + importSessionId: session.id, + type: "link", + url: "https://example.com/3", + status: "completed", + resultBookmarkId: crawlFailedBookmark.id, + }, + // Staging completed + tag failure -> completedBookmarks (failure is terminal) + { + importSessionId: session.id, + type: "text", + content: "tag failed", + status: "completed", + resultBookmarkId: tagFailedBookmark.id, + }, + // Staging completed + tag success (text, no crawl) -> completedBookmarks + { + importSessionId: session.id, + type: "text", + content: "completed text", + status: "completed", + resultBookmarkId: completedTextBookmark.id, + }, + ]); const stats = await api.importSessions.getImportSessionStats({ importSessionId: session.id, }); expect(stats).toMatchObject({ - completedBookmarks: 1, - pendingBookmarks: 0, - failedBookmarks: 0, - totalBookmarks: 1, - status: "completed", + totalBookmarks: 9, + pendingBookmarks: 1, // staging pending + processingBookmarks: 3, // staging processing (no bookmark) + crawl pending + tag pending + completedBookmarks: 4, // link success + text success + crawl failure + tag failure + failedBookmarks: 1, // staging failed }); }); @@ -215,7 +372,7 @@ describe("ImportSessions Routes", () => { ).rejects.toThrow("Import session not found"); }); - test<CustomTestContext>("cannot attach other user's bookmark", async ({ + test<CustomTestContext>("cannot stage other user's session", async ({ apiCallers, }) => { const api1 = apiCallers[0]; @@ -228,7 +385,17 @@ describe("ImportSessions Routes", () => { // User 1 tries to attach User 2's bookmark await expect( - createTestBookmark(api2, session.id), // User 2's bookmark + api2.importSessions.stageImportedBookmarks({ + importSessionId: session.id, + bookmarks: [ + { + type: "text", + content: "Test bookmark", + tags: [], + listIds: [], + }, + ], + }), ).rejects.toThrow("Import session not found"); }); }); diff --git a/packages/trpc/routers/importSessions.ts b/packages/trpc/routers/importSessions.ts index 4bdc4f29..62263bdd 100644 --- a/packages/trpc/routers/importSessions.ts +++ b/packages/trpc/routers/importSessions.ts @@ -1,5 +1,8 @@ +import { experimental_trpcMiddleware } from "@trpc/server"; +import { and, eq, gt } from "drizzle-orm"; import { z } from "zod"; +import { importStagingBookmarks } from "@karakeep/db/schema"; import { zCreateImportSessionRequestSchema, zDeleteImportSessionRequestSchema, @@ -9,9 +12,26 @@ import { zListImportSessionsResponseSchema, } from "@karakeep/shared/types/importSessions"; +import type { AuthedContext } from "../index"; import { authedProcedure, router } from "../index"; import { ImportSession } from "../models/importSessions"; +const ensureImportSessionAccess = experimental_trpcMiddleware<{ + ctx: AuthedContext; + input: { importSessionId: string }; +}>().create(async (opts) => { + const importSession = await ImportSession.fromId( + opts.ctx, + opts.input.importSessionId, + ); + return opts.next({ + ctx: { + ...opts.ctx, + importSession, + }, + }); +}); + export const importSessionsRouter = router({ createImportSession: authedProcedure .input(zCreateImportSessionRequestSchema) @@ -45,4 +65,93 @@ export const importSessionsRouter = router({ await session.delete(); return { success: true }; }), + + stageImportedBookmarks: authedProcedure + .input( + z.object({ + importSessionId: z.string(), + bookmarks: z + .array( + z.object({ + type: z.enum(["link", "text", "asset"]), + url: z.string().optional(), + title: z.string().optional(), + content: z.string().optional(), + note: z.string().optional(), + tags: z.array(z.string()).default([]), + listIds: z.array(z.string()).default([]), + sourceAddedAt: z.date().optional(), + }), + ) + .max(50), + }), + ) + .use(ensureImportSessionAccess) + .mutation(async ({ input, ctx }) => { + await ctx.importSession.stageBookmarks(input.bookmarks); + }), + + finalizeImportStaging: authedProcedure + .input(z.object({ importSessionId: z.string() })) + .use(ensureImportSessionAccess) + .mutation(async ({ ctx }) => { + await ctx.importSession.finalize(); + }), + + pauseImportSession: authedProcedure + .input(z.object({ importSessionId: z.string() })) + .use(ensureImportSessionAccess) + .mutation(async ({ ctx }) => { + await ctx.importSession.pause(); + }), + + resumeImportSession: authedProcedure + .input(z.object({ importSessionId: z.string() })) + .use(ensureImportSessionAccess) + .mutation(async ({ ctx }) => { + await ctx.importSession.resume(); + }), + + getImportSessionResults: authedProcedure + .input( + z.object({ + importSessionId: z.string(), + filter: z + .enum(["all", "accepted", "rejected", "skipped_duplicate", "pending"]) + .optional(), + cursor: z.string().optional(), + limit: z.number().default(50), + }), + ) + .use(ensureImportSessionAccess) + .query(async ({ ctx, input }) => { + const results = await ctx.db + .select() + .from(importStagingBookmarks) + .where( + and( + eq( + importStagingBookmarks.importSessionId, + ctx.importSession.session.id, + ), + input.filter && input.filter !== "all" + ? input.filter === "pending" + ? eq(importStagingBookmarks.status, "pending") + : eq(importStagingBookmarks.result, input.filter) + : undefined, + input.cursor + ? gt(importStagingBookmarks.id, input.cursor) + : undefined, + ), + ) + .orderBy(importStagingBookmarks.id) + .limit(input.limit + 1); + + // Return with pagination info + const hasMore = results.length > input.limit; + return { + items: results.slice(0, input.limit), + nextCursor: hasMore ? results[input.limit - 1].id : null, + }; + }), }); |
