aboutsummaryrefslogtreecommitdiffstats
path: root/packages/trpc/routers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-04 09:44:18 +0000
committerGitHub <noreply@github.com>2026-02-04 09:44:18 +0000
commit3c838ddb26c1e86d3f201ce71f13c834be705f69 (patch)
tree892fe4f8cd2ca01d6e4cd34f677fc16aa2fd63f6 /packages/trpc/routers
parent3fcccb858ee3ef22fe9ce479af4ce458ac9a0fe1 (diff)
downloadkarakeep-3c838ddb26c1e86d3f201ce71f13c834be705f69.tar.zst
feat: Import workflow v3 (#2378)
* feat: import workflow v3 * batch stage * revert migration * cleanups * pr comments * move to models * add allowed workers * e2e tests * import list ids * add missing indicies * merge test * more fixes * add resume/pause to UI * fix ui states * fix tests * simplify progress tracking * remove backpressure * fix list imports * fix race on claiming bookmarks * remove the codex file
Diffstat (limited to 'packages/trpc/routers')
-rw-r--r--packages/trpc/routers/bookmarks.ts13
-rw-r--r--packages/trpc/routers/importSessions.test.ts233
-rw-r--r--packages/trpc/routers/importSessions.ts109
3 files changed, 309 insertions, 46 deletions
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 882ff9b1..59c93581 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -51,7 +51,6 @@ import { authedProcedure, createRateLimitMiddleware, router } from "../index";
import { getBookmarkIdsFromMatcher } from "../lib/search";
import { Asset } from "../models/assets";
import { BareBookmark, Bookmark } from "../models/bookmarks";
-import { ImportSession } from "../models/importSessions";
export const ensureBookmarkOwnership = experimental_trpcMiddleware<{
ctx: AuthedContext;
@@ -121,13 +120,6 @@ export const bookmarksAppRouter = router({
// This doesn't 100% protect from duplicates because of races, but it's more than enough for this usecase.
const alreadyExists = await attemptToDedupLink(ctx, input.url);
if (alreadyExists) {
- if (input.importSessionId) {
- const session = await ImportSession.fromId(
- ctx,
- input.importSessionId,
- );
- await session.attachBookmark(alreadyExists.id);
- }
return { ...alreadyExists, alreadyExists: true };
}
}
@@ -277,11 +269,6 @@ export const bookmarksAppRouter = router({
},
);
- if (input.importSessionId) {
- const session = await ImportSession.fromId(ctx, input.importSessionId);
- await session.attachBookmark(bookmark.id);
- }
-
const enqueueOpts: EnqueueOptions = {
// The lower the priority number, the sooner the job will be processed
priority: input.crawlPriority === "low" ? 50 : 0,
diff --git a/packages/trpc/routers/importSessions.test.ts b/packages/trpc/routers/importSessions.test.ts
index 9ef0de6f..f257ad3b 100644
--- a/packages/trpc/routers/importSessions.test.ts
+++ b/packages/trpc/routers/importSessions.test.ts
@@ -1,12 +1,13 @@
-import { eq } from "drizzle-orm";
import { beforeEach, describe, expect, test } from "vitest";
import { z } from "zod";
-import { bookmarks } from "@karakeep/db/schema";
import {
- BookmarkTypes,
- zNewBookmarkRequestSchema,
-} from "@karakeep/shared/types/bookmarks";
+ bookmarkLinks,
+ bookmarks,
+ bookmarkTexts,
+ importStagingBookmarks,
+} from "@karakeep/db/schema";
+import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import {
zCreateImportSessionRequestSchema,
zDeleteImportSessionRequestSchema,
@@ -20,17 +21,6 @@ import { defaultBeforeEach } from "../testUtils";
beforeEach<CustomTestContext>(defaultBeforeEach(true));
describe("ImportSessions Routes", () => {
- async function createTestBookmark(api: APICallerType, sessionId: string) {
- const newBookmarkInput: z.infer<typeof zNewBookmarkRequestSchema> = {
- type: BookmarkTypes.TEXT,
- text: "Test bookmark text",
- importSessionId: sessionId,
- };
- const createdBookmark =
- await api.bookmarks.createBookmark(newBookmarkInput);
- return createdBookmark.id;
- }
-
async function createTestList(api: APICallerType) {
const newListInput: z.infer<typeof zNewBookmarkListSchema> = {
name: "Test Import List",
@@ -98,8 +88,15 @@ describe("ImportSessions Routes", () => {
const session = await api.importSessions.createImportSession({
name: "Test Import Session",
});
- await createTestBookmark(api, session.id);
- await createTestBookmark(api, session.id);
+
+ // Stage bookmarks using the staging flow
+ await api.importSessions.stageImportedBookmarks({
+ importSessionId: session.id,
+ bookmarks: [
+ { type: "text", content: "Test bookmark 1", tags: [], listIds: [] },
+ { type: "text", content: "Test bookmark 2", tags: [], listIds: [] },
+ ],
+ });
const statsInput: z.infer<typeof zGetImportSessionStatsRequestSchema> = {
importSessionId: session.id,
@@ -110,7 +107,7 @@ describe("ImportSessions Routes", () => {
expect(stats).toMatchObject({
id: session.id,
name: "Test Import Session",
- status: "in_progress",
+ status: "staging",
totalBookmarks: 2,
pendingBookmarks: 2,
completedBookmarks: 0,
@@ -119,31 +116,191 @@ describe("ImportSessions Routes", () => {
});
});
- test<CustomTestContext>("marks text-only imports as completed when tagging succeeds", async ({
+ test<CustomTestContext>("stats reflect crawl and tagging status for completed staging bookmarks", async ({
apiCallers,
db,
}) => {
const api = apiCallers[0];
+
const session = await api.importSessions.createImportSession({
- name: "Text Import Session",
+ name: "Test Import Session",
+ });
+
+ // Create bookmarks with different crawl/tag statuses
+ const user = (await db.query.users.findFirst())!;
+
+ // 1. Link bookmark: crawl success, tag success -> completed
+ const [completedLinkBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.LINK,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkLinks).values({
+ id: completedLinkBookmark.id,
+ url: "https://example.com/1",
+ crawlStatus: "success",
+ });
+
+ // 2. Link bookmark: crawl pending, tag success -> processing
+ const [crawlPendingBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.LINK,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkLinks).values({
+ id: crawlPendingBookmark.id,
+ url: "https://example.com/2",
+ crawlStatus: "pending",
+ });
+
+ // 3. Text bookmark: tag pending -> processing
+ const [tagPendingBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.TEXT,
+ taggingStatus: "pending",
+ })
+ .returning();
+ await db.insert(bookmarkTexts).values({
+ id: tagPendingBookmark.id,
+ text: "Test text",
+ });
+
+ // 4. Link bookmark: crawl failure -> failed
+ const [crawlFailedBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.LINK,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkLinks).values({
+ id: crawlFailedBookmark.id,
+ url: "https://example.com/3",
+ crawlStatus: "failure",
});
- const bookmarkId = await createTestBookmark(api, session.id);
- await db
- .update(bookmarks)
- .set({ taggingStatus: "success" })
- .where(eq(bookmarks.id, bookmarkId));
+ // 5. Text bookmark: tag failure -> failed
+ const [tagFailedBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.TEXT,
+ taggingStatus: "failure",
+ })
+ .returning();
+ await db.insert(bookmarkTexts).values({
+ id: tagFailedBookmark.id,
+ text: "Test text 2",
+ });
+
+ // 6. Text bookmark: tag success (no crawl needed) -> completed
+ const [completedTextBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.TEXT,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkTexts).values({
+ id: completedTextBookmark.id,
+ text: "Test text 3",
+ });
+
+ // Create staging bookmarks in different states
+ // Note: With the new import worker design, items stay in "processing" until
+ // crawl/tag is done. Only then do they move to "completed".
+ await db.insert(importStagingBookmarks).values([
+ // Staging pending -> pendingBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "pending staging",
+ status: "pending",
+ },
+ // Staging processing (no bookmark yet) -> processingBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "processing staging",
+ status: "processing",
+ },
+ // Staging failed -> failedBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "failed staging",
+ status: "failed",
+ },
+ // Staging completed + crawl/tag success -> completedBookmarks
+ {
+ importSessionId: session.id,
+ type: "link",
+ url: "https://example.com/1",
+ status: "completed",
+ resultBookmarkId: completedLinkBookmark.id,
+ },
+ // Staging processing + crawl pending -> processingBookmarks (waiting for crawl)
+ {
+ importSessionId: session.id,
+ type: "link",
+ url: "https://example.com/2",
+ status: "processing",
+ resultBookmarkId: crawlPendingBookmark.id,
+ },
+ // Staging processing + tag pending -> processingBookmarks (waiting for tag)
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "tag pending",
+ status: "processing",
+ resultBookmarkId: tagPendingBookmark.id,
+ },
+ // Staging completed + crawl failure -> completedBookmarks (failure is terminal)
+ {
+ importSessionId: session.id,
+ type: "link",
+ url: "https://example.com/3",
+ status: "completed",
+ resultBookmarkId: crawlFailedBookmark.id,
+ },
+ // Staging completed + tag failure -> completedBookmarks (failure is terminal)
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "tag failed",
+ status: "completed",
+ resultBookmarkId: tagFailedBookmark.id,
+ },
+ // Staging completed + tag success (text, no crawl) -> completedBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "completed text",
+ status: "completed",
+ resultBookmarkId: completedTextBookmark.id,
+ },
+ ]);
const stats = await api.importSessions.getImportSessionStats({
importSessionId: session.id,
});
expect(stats).toMatchObject({
- completedBookmarks: 1,
- pendingBookmarks: 0,
- failedBookmarks: 0,
- totalBookmarks: 1,
- status: "completed",
+ totalBookmarks: 9,
+ pendingBookmarks: 1, // staging pending
+ processingBookmarks: 3, // staging processing (no bookmark) + crawl pending + tag pending
+ completedBookmarks: 4, // link success + text success + crawl failure + tag failure
+ failedBookmarks: 1, // staging failed
});
});
@@ -215,7 +372,7 @@ describe("ImportSessions Routes", () => {
).rejects.toThrow("Import session not found");
});
- test<CustomTestContext>("cannot attach other user's bookmark", async ({
+ test<CustomTestContext>("cannot stage other user's session", async ({
apiCallers,
}) => {
const api1 = apiCallers[0];
@@ -228,7 +385,17 @@ describe("ImportSessions Routes", () => {
// User 1 tries to attach User 2's bookmark
await expect(
- createTestBookmark(api2, session.id), // User 2's bookmark
+ api2.importSessions.stageImportedBookmarks({
+ importSessionId: session.id,
+ bookmarks: [
+ {
+ type: "text",
+ content: "Test bookmark",
+ tags: [],
+ listIds: [],
+ },
+ ],
+ }),
).rejects.toThrow("Import session not found");
});
});
diff --git a/packages/trpc/routers/importSessions.ts b/packages/trpc/routers/importSessions.ts
index 4bdc4f29..62263bdd 100644
--- a/packages/trpc/routers/importSessions.ts
+++ b/packages/trpc/routers/importSessions.ts
@@ -1,5 +1,8 @@
+import { experimental_trpcMiddleware } from "@trpc/server";
+import { and, eq, gt } from "drizzle-orm";
import { z } from "zod";
+import { importStagingBookmarks } from "@karakeep/db/schema";
import {
zCreateImportSessionRequestSchema,
zDeleteImportSessionRequestSchema,
@@ -9,9 +12,26 @@ import {
zListImportSessionsResponseSchema,
} from "@karakeep/shared/types/importSessions";
+import type { AuthedContext } from "../index";
import { authedProcedure, router } from "../index";
import { ImportSession } from "../models/importSessions";
+const ensureImportSessionAccess = experimental_trpcMiddleware<{
+ ctx: AuthedContext;
+ input: { importSessionId: string };
+}>().create(async (opts) => {
+ const importSession = await ImportSession.fromId(
+ opts.ctx,
+ opts.input.importSessionId,
+ );
+ return opts.next({
+ ctx: {
+ ...opts.ctx,
+ importSession,
+ },
+ });
+});
+
export const importSessionsRouter = router({
createImportSession: authedProcedure
.input(zCreateImportSessionRequestSchema)
@@ -45,4 +65,93 @@ export const importSessionsRouter = router({
await session.delete();
return { success: true };
}),
+
+ stageImportedBookmarks: authedProcedure
+ .input(
+ z.object({
+ importSessionId: z.string(),
+ bookmarks: z
+ .array(
+ z.object({
+ type: z.enum(["link", "text", "asset"]),
+ url: z.string().optional(),
+ title: z.string().optional(),
+ content: z.string().optional(),
+ note: z.string().optional(),
+ tags: z.array(z.string()).default([]),
+ listIds: z.array(z.string()).default([]),
+ sourceAddedAt: z.date().optional(),
+ }),
+ )
+ .max(50),
+ }),
+ )
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ input, ctx }) => {
+ await ctx.importSession.stageBookmarks(input.bookmarks);
+ }),
+
+ finalizeImportStaging: authedProcedure
+ .input(z.object({ importSessionId: z.string() }))
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ ctx }) => {
+ await ctx.importSession.finalize();
+ }),
+
+ pauseImportSession: authedProcedure
+ .input(z.object({ importSessionId: z.string() }))
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ ctx }) => {
+ await ctx.importSession.pause();
+ }),
+
+ resumeImportSession: authedProcedure
+ .input(z.object({ importSessionId: z.string() }))
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ ctx }) => {
+ await ctx.importSession.resume();
+ }),
+
+ getImportSessionResults: authedProcedure
+ .input(
+ z.object({
+ importSessionId: z.string(),
+ filter: z
+ .enum(["all", "accepted", "rejected", "skipped_duplicate", "pending"])
+ .optional(),
+ cursor: z.string().optional(),
+ limit: z.number().default(50),
+ }),
+ )
+ .use(ensureImportSessionAccess)
+ .query(async ({ ctx, input }) => {
+ const results = await ctx.db
+ .select()
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(
+ importStagingBookmarks.importSessionId,
+ ctx.importSession.session.id,
+ ),
+ input.filter && input.filter !== "all"
+ ? input.filter === "pending"
+ ? eq(importStagingBookmarks.status, "pending")
+ : eq(importStagingBookmarks.result, input.filter)
+ : undefined,
+ input.cursor
+ ? gt(importStagingBookmarks.id, input.cursor)
+ : undefined,
+ ),
+ )
+ .orderBy(importStagingBookmarks.id)
+ .limit(input.limit + 1);
+
+ // Return with pagination info
+ const hasMore = results.length > input.limit;
+ return {
+ items: results.slice(0, input.limit),
+ nextCursor: hasMore ? results[input.limit - 1].id : null,
+ };
+ }),
});