import { TRPCError } from "@trpc/server"; import { and, count, eq } from "drizzle-orm"; import { z } from "zod"; import { importSessions, importStagingBookmarks } from "@karakeep/db/schema"; import { zCreateImportSessionRequestSchema, ZImportSession, ZImportSessionWithStats, } from "@karakeep/shared/types/importSessions"; import type { AuthedContext } from "../index"; export class ImportSession { protected constructor( protected ctx: AuthedContext, public session: ZImportSession, ) {} static async fromId( ctx: AuthedContext, importSessionId: string, ): Promise { const session = await ctx.db.query.importSessions.findFirst({ where: and( eq(importSessions.id, importSessionId), eq(importSessions.userId, ctx.user.id), ), }); if (!session) { throw new TRPCError({ code: "NOT_FOUND", message: "Import session not found", }); } return new ImportSession(ctx, session); } static async create( ctx: AuthedContext, input: z.infer, ): Promise { const [session] = await ctx.db .insert(importSessions) .values({ name: input.name, userId: ctx.user.id, rootListId: input.rootListId, }) .returning(); return new ImportSession(ctx, session); } static async getAll(ctx: AuthedContext): Promise { const sessions = await ctx.db.query.importSessions.findMany({ where: eq(importSessions.userId, ctx.user.id), orderBy: (importSessions, { desc }) => [desc(importSessions.createdAt)], limit: 50, }); return sessions.map((session) => new ImportSession(ctx, session)); } static async getAllWithStats( ctx: AuthedContext, ): Promise { const sessions = await this.getAll(ctx); return await Promise.all( sessions.map(async (session) => { return await session.getWithStats(); }), ); } async getWithStats(): Promise { // Count by staging status - this now reflects the true state since // items stay in "processing" until downstream crawl/tag is complete const statusCounts = await this.ctx.db .select({ status: importStagingBookmarks.status, count: count(), }) .from(importStagingBookmarks) .where(eq(importStagingBookmarks.importSessionId, this.session.id)) .groupBy(importStagingBookmarks.status); const stats = { totalBookmarks: 0, completedBookmarks: 0, failedBookmarks: 0, pendingBookmarks: 0, processingBookmarks: 0, }; statusCounts.forEach(({ status, count: itemCount }) => { stats.totalBookmarks += itemCount; switch (status) { case "pending": stats.pendingBookmarks += itemCount; break; case "processing": stats.processingBookmarks += itemCount; break; case "completed": stats.completedBookmarks += itemCount; break; case "failed": stats.failedBookmarks += itemCount; break; } }); return { ...this.session, ...stats, }; } async delete(): Promise { // Delete the session (cascade will handle the bookmarks) const result = await this.ctx.db .delete(importSessions) .where( and( eq(importSessions.id, this.session.id), eq(importSessions.userId, this.ctx.user.id), ), ); if (result.changes === 0) { throw new TRPCError({ code: "NOT_FOUND", message: "Import session not found", }); } } async stageBookmarks( bookmarks: { type: "link" | "text" | "asset"; url?: string; title?: string; content?: string; note?: string; tags: string[]; listIds: string[]; sourceAddedAt?: Date; }[], ): Promise { if (this.session.status !== "staging") { throw new TRPCError({ code: "BAD_REQUEST", message: "Session not in staging status", }); } // Filter out invalid bookmarks (link without url, text without content) const validBookmarks = bookmarks.filter((bookmark) => { if (bookmark.type === "link" && !bookmark.url) return false; if (bookmark.type === "text" && !bookmark.content) return false; return true; }); if (validBookmarks.length === 0) { return; } await this.ctx.db.insert(importStagingBookmarks).values( validBookmarks.map((bookmark) => ({ importSessionId: this.session.id, type: bookmark.type, url: bookmark.url, title: bookmark.title, content: bookmark.content, note: bookmark.note, tags: bookmark.tags, listIds: bookmark.listIds, sourceAddedAt: bookmark.sourceAddedAt, status: "pending" as const, })), ); } async finalize(): Promise { if (this.session.status !== "staging") { throw new TRPCError({ code: "BAD_REQUEST", message: "Session not in staging status", }); } await this.ctx.db .update(importSessions) .set({ status: "pending" }) .where(eq(importSessions.id, this.session.id)); } async pause(): Promise { if (!["pending", "running"].includes(this.session.status)) { throw new TRPCError({ code: "BAD_REQUEST", message: "Session cannot be paused in current status", }); } await this.ctx.db .update(importSessions) .set({ status: "paused" }) .where(eq(importSessions.id, this.session.id)); } async resume(): Promise { if (this.session.status !== "paused") { throw new TRPCError({ code: "BAD_REQUEST", message: "Session not paused", }); } await this.ctx.db .update(importSessions) .set({ status: "pending" }) .where(eq(importSessions.id, this.session.id)); } }