aboutsummaryrefslogtreecommitdiffstats
path: root/packages/trpc
diff options
context:
space:
mode:
Diffstat (limited to 'packages/trpc')
-rw-r--r--packages/trpc/models/importSessions.ts174
-rw-r--r--packages/trpc/routers/bookmarks.ts13
-rw-r--r--packages/trpc/routers/importSessions.test.ts233
-rw-r--r--packages/trpc/routers/importSessions.ts109
4 files changed, 420 insertions, 109 deletions
diff --git a/packages/trpc/models/importSessions.ts b/packages/trpc/models/importSessions.ts
index c324cf7f..ee0eb5b2 100644
--- a/packages/trpc/models/importSessions.ts
+++ b/packages/trpc/models/importSessions.ts
@@ -2,12 +2,7 @@ import { TRPCError } from "@trpc/server";
import { and, count, eq } from "drizzle-orm";
import { z } from "zod";
-import {
- bookmarkLinks,
- bookmarks,
- importSessionBookmarks,
- importSessions,
-} from "@karakeep/db/schema";
+import { importSessions, importStagingBookmarks } from "@karakeep/db/schema";
import {
zCreateImportSessionRequestSchema,
ZImportSession,
@@ -81,38 +76,17 @@ export class ImportSession {
);
}
- async attachBookmark(bookmarkId: string): Promise<void> {
- await this.ctx.db.insert(importSessionBookmarks).values({
- importSessionId: this.session.id,
- bookmarkId,
- });
- }
-
async getWithStats(): Promise<ZImportSessionWithStats> {
- // Get bookmark counts by status
+ // Count by staging status - this now reflects the true state since
+ // items stay in "processing" until downstream crawl/tag is complete
const statusCounts = await this.ctx.db
.select({
- crawlStatus: bookmarkLinks.crawlStatus,
- taggingStatus: bookmarks.taggingStatus,
+ status: importStagingBookmarks.status,
count: count(),
})
- .from(importSessionBookmarks)
- .innerJoin(
- importSessions,
- eq(importSessions.id, importSessionBookmarks.importSessionId),
- )
- .leftJoin(bookmarks, eq(bookmarks.id, importSessionBookmarks.bookmarkId))
- .leftJoin(
- bookmarkLinks,
- eq(bookmarkLinks.id, importSessionBookmarks.bookmarkId),
- )
- .where(
- and(
- eq(importSessionBookmarks.importSessionId, this.session.id),
- eq(importSessions.userId, this.ctx.user.id),
- ),
- )
- .groupBy(bookmarkLinks.crawlStatus, bookmarks.taggingStatus);
+ .from(importStagingBookmarks)
+ .where(eq(importStagingBookmarks.importSessionId, this.session.id))
+ .groupBy(importStagingBookmarks.status);
const stats = {
totalBookmarks: 0,
@@ -122,41 +96,27 @@ export class ImportSession {
processingBookmarks: 0,
};
- statusCounts.forEach((statusCount) => {
- const { crawlStatus, taggingStatus, count } = statusCount;
-
- stats.totalBookmarks += count;
-
- const isCrawlFailure = crawlStatus === "failure";
- const isTagFailure = taggingStatus === "failure";
- if (isCrawlFailure || isTagFailure) {
- stats.failedBookmarks += count;
- return;
- }
-
- const isCrawlPending = crawlStatus === "pending";
- const isTagPending = taggingStatus === "pending";
- if (isCrawlPending || isTagPending) {
- stats.pendingBookmarks += count;
- return;
- }
-
- const isCrawlSuccessfulOrNotRequired =
- crawlStatus === "success" || crawlStatus === null;
- const isTagSuccessfulOrUnknown =
- taggingStatus === "success" || taggingStatus === null;
-
- if (isCrawlSuccessfulOrNotRequired && isTagSuccessfulOrUnknown) {
- stats.completedBookmarks += count;
- } else {
- // Fallback to pending to avoid leaving imports unclassified
- stats.pendingBookmarks += count;
+ statusCounts.forEach(({ status, count: itemCount }) => {
+ stats.totalBookmarks += itemCount;
+
+ switch (status) {
+ case "pending":
+ stats.pendingBookmarks += itemCount;
+ break;
+ case "processing":
+ stats.processingBookmarks += itemCount;
+ break;
+ case "completed":
+ stats.completedBookmarks += itemCount;
+ break;
+ case "failed":
+ stats.failedBookmarks += itemCount;
+ break;
}
});
return {
...this.session,
- status: stats.pendingBookmarks > 0 ? "in_progress" : "completed",
...stats,
};
}
@@ -179,4 +139,92 @@ export class ImportSession {
});
}
}
+
+ async stageBookmarks(
+ bookmarks: {
+ type: "link" | "text" | "asset";
+ url?: string;
+ title?: string;
+ content?: string;
+ note?: string;
+ tags: string[];
+ listIds: string[];
+ sourceAddedAt?: Date;
+ }[],
+ ): Promise<void> {
+ if (this.session.status !== "staging") {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session not in staging status",
+ });
+ }
+
+ // Filter out invalid bookmarks (link without url, text without content)
+ const validBookmarks = bookmarks.filter((bookmark) => {
+ if (bookmark.type === "link" && !bookmark.url) return false;
+ if (bookmark.type === "text" && !bookmark.content) return false;
+ return true;
+ });
+
+ if (validBookmarks.length === 0) {
+ return;
+ }
+
+ await this.ctx.db.insert(importStagingBookmarks).values(
+ validBookmarks.map((bookmark) => ({
+ importSessionId: this.session.id,
+ type: bookmark.type,
+ url: bookmark.url,
+ title: bookmark.title,
+ content: bookmark.content,
+ note: bookmark.note,
+ tags: bookmark.tags,
+ listIds: bookmark.listIds,
+ sourceAddedAt: bookmark.sourceAddedAt,
+ status: "pending" as const,
+ })),
+ );
+ }
+
+ async finalize(): Promise<void> {
+ if (this.session.status !== "staging") {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session not in staging status",
+ });
+ }
+
+ await this.ctx.db
+ .update(importSessions)
+ .set({ status: "pending" })
+ .where(eq(importSessions.id, this.session.id));
+ }
+
+ async pause(): Promise<void> {
+ if (!["pending", "running"].includes(this.session.status)) {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session cannot be paused in current status",
+ });
+ }
+
+ await this.ctx.db
+ .update(importSessions)
+ .set({ status: "paused" })
+ .where(eq(importSessions.id, this.session.id));
+ }
+
+ async resume(): Promise<void> {
+ if (this.session.status !== "paused") {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session not paused",
+ });
+ }
+
+ await this.ctx.db
+ .update(importSessions)
+ .set({ status: "pending" })
+ .where(eq(importSessions.id, this.session.id));
+ }
}
diff --git a/packages/trpc/routers/bookmarks.ts b/packages/trpc/routers/bookmarks.ts
index 882ff9b1..59c93581 100644
--- a/packages/trpc/routers/bookmarks.ts
+++ b/packages/trpc/routers/bookmarks.ts
@@ -51,7 +51,6 @@ import { authedProcedure, createRateLimitMiddleware, router } from "../index";
import { getBookmarkIdsFromMatcher } from "../lib/search";
import { Asset } from "../models/assets";
import { BareBookmark, Bookmark } from "../models/bookmarks";
-import { ImportSession } from "../models/importSessions";
export const ensureBookmarkOwnership = experimental_trpcMiddleware<{
ctx: AuthedContext;
@@ -121,13 +120,6 @@ export const bookmarksAppRouter = router({
// This doesn't 100% protect from duplicates because of races, but it's more than enough for this usecase.
const alreadyExists = await attemptToDedupLink(ctx, input.url);
if (alreadyExists) {
- if (input.importSessionId) {
- const session = await ImportSession.fromId(
- ctx,
- input.importSessionId,
- );
- await session.attachBookmark(alreadyExists.id);
- }
return { ...alreadyExists, alreadyExists: true };
}
}
@@ -277,11 +269,6 @@ export const bookmarksAppRouter = router({
},
);
- if (input.importSessionId) {
- const session = await ImportSession.fromId(ctx, input.importSessionId);
- await session.attachBookmark(bookmark.id);
- }
-
const enqueueOpts: EnqueueOptions = {
// The lower the priority number, the sooner the job will be processed
priority: input.crawlPriority === "low" ? 50 : 0,
diff --git a/packages/trpc/routers/importSessions.test.ts b/packages/trpc/routers/importSessions.test.ts
index 9ef0de6f..f257ad3b 100644
--- a/packages/trpc/routers/importSessions.test.ts
+++ b/packages/trpc/routers/importSessions.test.ts
@@ -1,12 +1,13 @@
-import { eq } from "drizzle-orm";
import { beforeEach, describe, expect, test } from "vitest";
import { z } from "zod";
-import { bookmarks } from "@karakeep/db/schema";
import {
- BookmarkTypes,
- zNewBookmarkRequestSchema,
-} from "@karakeep/shared/types/bookmarks";
+ bookmarkLinks,
+ bookmarks,
+ bookmarkTexts,
+ importStagingBookmarks,
+} from "@karakeep/db/schema";
+import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import {
zCreateImportSessionRequestSchema,
zDeleteImportSessionRequestSchema,
@@ -20,17 +21,6 @@ import { defaultBeforeEach } from "../testUtils";
beforeEach<CustomTestContext>(defaultBeforeEach(true));
describe("ImportSessions Routes", () => {
- async function createTestBookmark(api: APICallerType, sessionId: string) {
- const newBookmarkInput: z.infer<typeof zNewBookmarkRequestSchema> = {
- type: BookmarkTypes.TEXT,
- text: "Test bookmark text",
- importSessionId: sessionId,
- };
- const createdBookmark =
- await api.bookmarks.createBookmark(newBookmarkInput);
- return createdBookmark.id;
- }
-
async function createTestList(api: APICallerType) {
const newListInput: z.infer<typeof zNewBookmarkListSchema> = {
name: "Test Import List",
@@ -98,8 +88,15 @@ describe("ImportSessions Routes", () => {
const session = await api.importSessions.createImportSession({
name: "Test Import Session",
});
- await createTestBookmark(api, session.id);
- await createTestBookmark(api, session.id);
+
+ // Stage bookmarks using the staging flow
+ await api.importSessions.stageImportedBookmarks({
+ importSessionId: session.id,
+ bookmarks: [
+ { type: "text", content: "Test bookmark 1", tags: [], listIds: [] },
+ { type: "text", content: "Test bookmark 2", tags: [], listIds: [] },
+ ],
+ });
const statsInput: z.infer<typeof zGetImportSessionStatsRequestSchema> = {
importSessionId: session.id,
@@ -110,7 +107,7 @@ describe("ImportSessions Routes", () => {
expect(stats).toMatchObject({
id: session.id,
name: "Test Import Session",
- status: "in_progress",
+ status: "staging",
totalBookmarks: 2,
pendingBookmarks: 2,
completedBookmarks: 0,
@@ -119,31 +116,191 @@ describe("ImportSessions Routes", () => {
});
});
- test<CustomTestContext>("marks text-only imports as completed when tagging succeeds", async ({
+ test<CustomTestContext>("stats reflect crawl and tagging status for completed staging bookmarks", async ({
apiCallers,
db,
}) => {
const api = apiCallers[0];
+
const session = await api.importSessions.createImportSession({
- name: "Text Import Session",
+ name: "Test Import Session",
+ });
+
+ // Create bookmarks with different crawl/tag statuses
+ const user = (await db.query.users.findFirst())!;
+
+ // 1. Link bookmark: crawl success, tag success -> completed
+ const [completedLinkBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.LINK,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkLinks).values({
+ id: completedLinkBookmark.id,
+ url: "https://example.com/1",
+ crawlStatus: "success",
+ });
+
+ // 2. Link bookmark: crawl pending, tag success -> processing
+ const [crawlPendingBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.LINK,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkLinks).values({
+ id: crawlPendingBookmark.id,
+ url: "https://example.com/2",
+ crawlStatus: "pending",
+ });
+
+ // 3. Text bookmark: tag pending -> processing
+ const [tagPendingBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.TEXT,
+ taggingStatus: "pending",
+ })
+ .returning();
+ await db.insert(bookmarkTexts).values({
+ id: tagPendingBookmark.id,
+ text: "Test text",
+ });
+
+ // 4. Link bookmark: crawl failure -> failed
+ const [crawlFailedBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.LINK,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkLinks).values({
+ id: crawlFailedBookmark.id,
+ url: "https://example.com/3",
+ crawlStatus: "failure",
});
- const bookmarkId = await createTestBookmark(api, session.id);
- await db
- .update(bookmarks)
- .set({ taggingStatus: "success" })
- .where(eq(bookmarks.id, bookmarkId));
+ // 5. Text bookmark: tag failure -> failed
+ const [tagFailedBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.TEXT,
+ taggingStatus: "failure",
+ })
+ .returning();
+ await db.insert(bookmarkTexts).values({
+ id: tagFailedBookmark.id,
+ text: "Test text 2",
+ });
+
+ // 6. Text bookmark: tag success (no crawl needed) -> completed
+ const [completedTextBookmark] = await db
+ .insert(bookmarks)
+ .values({
+ userId: user.id,
+ type: BookmarkTypes.TEXT,
+ taggingStatus: "success",
+ })
+ .returning();
+ await db.insert(bookmarkTexts).values({
+ id: completedTextBookmark.id,
+ text: "Test text 3",
+ });
+
+ // Create staging bookmarks in different states
+ // Note: With the new import worker design, items stay in "processing" until
+ // crawl/tag is done. Only then do they move to "completed".
+ await db.insert(importStagingBookmarks).values([
+ // Staging pending -> pendingBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "pending staging",
+ status: "pending",
+ },
+ // Staging processing (no bookmark yet) -> processingBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "processing staging",
+ status: "processing",
+ },
+ // Staging failed -> failedBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "failed staging",
+ status: "failed",
+ },
+ // Staging completed + crawl/tag success -> completedBookmarks
+ {
+ importSessionId: session.id,
+ type: "link",
+ url: "https://example.com/1",
+ status: "completed",
+ resultBookmarkId: completedLinkBookmark.id,
+ },
+ // Staging processing + crawl pending -> processingBookmarks (waiting for crawl)
+ {
+ importSessionId: session.id,
+ type: "link",
+ url: "https://example.com/2",
+ status: "processing",
+ resultBookmarkId: crawlPendingBookmark.id,
+ },
+ // Staging processing + tag pending -> processingBookmarks (waiting for tag)
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "tag pending",
+ status: "processing",
+ resultBookmarkId: tagPendingBookmark.id,
+ },
+ // Staging completed + crawl failure -> completedBookmarks (failure is terminal)
+ {
+ importSessionId: session.id,
+ type: "link",
+ url: "https://example.com/3",
+ status: "completed",
+ resultBookmarkId: crawlFailedBookmark.id,
+ },
+ // Staging completed + tag failure -> completedBookmarks (failure is terminal)
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "tag failed",
+ status: "completed",
+ resultBookmarkId: tagFailedBookmark.id,
+ },
+ // Staging completed + tag success (text, no crawl) -> completedBookmarks
+ {
+ importSessionId: session.id,
+ type: "text",
+ content: "completed text",
+ status: "completed",
+ resultBookmarkId: completedTextBookmark.id,
+ },
+ ]);
const stats = await api.importSessions.getImportSessionStats({
importSessionId: session.id,
});
expect(stats).toMatchObject({
- completedBookmarks: 1,
- pendingBookmarks: 0,
- failedBookmarks: 0,
- totalBookmarks: 1,
- status: "completed",
+ totalBookmarks: 9,
+ pendingBookmarks: 1, // staging pending
+ processingBookmarks: 3, // staging processing (no bookmark) + crawl pending + tag pending
+ completedBookmarks: 4, // link success + text success + crawl failure + tag failure
+ failedBookmarks: 1, // staging failed
});
});
@@ -215,7 +372,7 @@ describe("ImportSessions Routes", () => {
).rejects.toThrow("Import session not found");
});
- test<CustomTestContext>("cannot attach other user's bookmark", async ({
+ test<CustomTestContext>("cannot stage other user's session", async ({
apiCallers,
}) => {
const api1 = apiCallers[0];
@@ -228,7 +385,17 @@ describe("ImportSessions Routes", () => {
// User 1 tries to attach User 2's bookmark
await expect(
- createTestBookmark(api2, session.id), // User 2's bookmark
+ api2.importSessions.stageImportedBookmarks({
+ importSessionId: session.id,
+ bookmarks: [
+ {
+ type: "text",
+ content: "Test bookmark",
+ tags: [],
+ listIds: [],
+ },
+ ],
+ }),
).rejects.toThrow("Import session not found");
});
});
diff --git a/packages/trpc/routers/importSessions.ts b/packages/trpc/routers/importSessions.ts
index 4bdc4f29..62263bdd 100644
--- a/packages/trpc/routers/importSessions.ts
+++ b/packages/trpc/routers/importSessions.ts
@@ -1,5 +1,8 @@
+import { experimental_trpcMiddleware } from "@trpc/server";
+import { and, eq, gt } from "drizzle-orm";
import { z } from "zod";
+import { importStagingBookmarks } from "@karakeep/db/schema";
import {
zCreateImportSessionRequestSchema,
zDeleteImportSessionRequestSchema,
@@ -9,9 +12,26 @@ import {
zListImportSessionsResponseSchema,
} from "@karakeep/shared/types/importSessions";
+import type { AuthedContext } from "../index";
import { authedProcedure, router } from "../index";
import { ImportSession } from "../models/importSessions";
+const ensureImportSessionAccess = experimental_trpcMiddleware<{
+ ctx: AuthedContext;
+ input: { importSessionId: string };
+}>().create(async (opts) => {
+ const importSession = await ImportSession.fromId(
+ opts.ctx,
+ opts.input.importSessionId,
+ );
+ return opts.next({
+ ctx: {
+ ...opts.ctx,
+ importSession,
+ },
+ });
+});
+
export const importSessionsRouter = router({
createImportSession: authedProcedure
.input(zCreateImportSessionRequestSchema)
@@ -45,4 +65,93 @@ export const importSessionsRouter = router({
await session.delete();
return { success: true };
}),
+
+ stageImportedBookmarks: authedProcedure
+ .input(
+ z.object({
+ importSessionId: z.string(),
+ bookmarks: z
+ .array(
+ z.object({
+ type: z.enum(["link", "text", "asset"]),
+ url: z.string().optional(),
+ title: z.string().optional(),
+ content: z.string().optional(),
+ note: z.string().optional(),
+ tags: z.array(z.string()).default([]),
+ listIds: z.array(z.string()).default([]),
+ sourceAddedAt: z.date().optional(),
+ }),
+ )
+ .max(50),
+ }),
+ )
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ input, ctx }) => {
+ await ctx.importSession.stageBookmarks(input.bookmarks);
+ }),
+
+ finalizeImportStaging: authedProcedure
+ .input(z.object({ importSessionId: z.string() }))
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ ctx }) => {
+ await ctx.importSession.finalize();
+ }),
+
+ pauseImportSession: authedProcedure
+ .input(z.object({ importSessionId: z.string() }))
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ ctx }) => {
+ await ctx.importSession.pause();
+ }),
+
+ resumeImportSession: authedProcedure
+ .input(z.object({ importSessionId: z.string() }))
+ .use(ensureImportSessionAccess)
+ .mutation(async ({ ctx }) => {
+ await ctx.importSession.resume();
+ }),
+
+ getImportSessionResults: authedProcedure
+ .input(
+ z.object({
+ importSessionId: z.string(),
+ filter: z
+ .enum(["all", "accepted", "rejected", "skipped_duplicate", "pending"])
+ .optional(),
+ cursor: z.string().optional(),
+ limit: z.number().default(50),
+ }),
+ )
+ .use(ensureImportSessionAccess)
+ .query(async ({ ctx, input }) => {
+ const results = await ctx.db
+ .select()
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(
+ importStagingBookmarks.importSessionId,
+ ctx.importSession.session.id,
+ ),
+ input.filter && input.filter !== "all"
+ ? input.filter === "pending"
+ ? eq(importStagingBookmarks.status, "pending")
+ : eq(importStagingBookmarks.result, input.filter)
+ : undefined,
+ input.cursor
+ ? gt(importStagingBookmarks.id, input.cursor)
+ : undefined,
+ ),
+ )
+ .orderBy(importStagingBookmarks.id)
+ .limit(input.limit + 1);
+
+ // Return with pagination info
+ const hasMore = results.length > input.limit;
+ return {
+ items: results.slice(0, input.limit),
+ nextCursor: hasMore ? results[input.limit - 1].id : null,
+ };
+ }),
});