diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-04 09:44:18 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-04 09:44:18 +0000 |
| commit | 3c838ddb26c1e86d3f201ce71f13c834be705f69 (patch) | |
| tree | 892fe4f8cd2ca01d6e4cd34f677fc16aa2fd63f6 /packages/trpc/models | |
| parent | 3fcccb858ee3ef22fe9ce479af4ce458ac9a0fe1 (diff) | |
| download | karakeep-3c838ddb26c1e86d3f201ce71f13c834be705f69.tar.zst | |
feat: Import workflow v3 (#2378)
* feat: import workflow v3
* batch stage
* revert migration
* cleanups
* pr comments
* move to models
* add allowed workers
* e2e tests
* import list ids
* add missing indicies
* merge test
* more fixes
* add resume/pause to UI
* fix ui states
* fix tests
* simplify progress tracking
* remove backpressure
* fix list imports
* fix race on claiming bookmarks
* remove the codex file
Diffstat (limited to 'packages/trpc/models')
| -rw-r--r-- | packages/trpc/models/importSessions.ts | 174 |
1 files changed, 111 insertions, 63 deletions
diff --git a/packages/trpc/models/importSessions.ts b/packages/trpc/models/importSessions.ts index c324cf7f..ee0eb5b2 100644 --- a/packages/trpc/models/importSessions.ts +++ b/packages/trpc/models/importSessions.ts @@ -2,12 +2,7 @@ import { TRPCError } from "@trpc/server"; import { and, count, eq } from "drizzle-orm"; import { z } from "zod"; -import { - bookmarkLinks, - bookmarks, - importSessionBookmarks, - importSessions, -} from "@karakeep/db/schema"; +import { importSessions, importStagingBookmarks } from "@karakeep/db/schema"; import { zCreateImportSessionRequestSchema, ZImportSession, @@ -81,38 +76,17 @@ export class ImportSession { ); } - async attachBookmark(bookmarkId: string): Promise<void> { - await this.ctx.db.insert(importSessionBookmarks).values({ - importSessionId: this.session.id, - bookmarkId, - }); - } - async getWithStats(): Promise<ZImportSessionWithStats> { - // Get bookmark counts by status + // Count by staging status - this now reflects the true state since + // items stay in "processing" until downstream crawl/tag is complete const statusCounts = await this.ctx.db .select({ - crawlStatus: bookmarkLinks.crawlStatus, - taggingStatus: bookmarks.taggingStatus, + status: importStagingBookmarks.status, count: count(), }) - .from(importSessionBookmarks) - .innerJoin( - importSessions, - eq(importSessions.id, importSessionBookmarks.importSessionId), - ) - .leftJoin(bookmarks, eq(bookmarks.id, importSessionBookmarks.bookmarkId)) - .leftJoin( - bookmarkLinks, - eq(bookmarkLinks.id, importSessionBookmarks.bookmarkId), - ) - .where( - and( - eq(importSessionBookmarks.importSessionId, this.session.id), - eq(importSessions.userId, this.ctx.user.id), - ), - ) - .groupBy(bookmarkLinks.crawlStatus, bookmarks.taggingStatus); + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.importSessionId, this.session.id)) + .groupBy(importStagingBookmarks.status); const stats = { totalBookmarks: 0, @@ -122,41 +96,27 @@ export class ImportSession { processingBookmarks: 0, }; - statusCounts.forEach((statusCount) => { - const { crawlStatus, taggingStatus, count } = statusCount; - - stats.totalBookmarks += count; - - const isCrawlFailure = crawlStatus === "failure"; - const isTagFailure = taggingStatus === "failure"; - if (isCrawlFailure || isTagFailure) { - stats.failedBookmarks += count; - return; - } - - const isCrawlPending = crawlStatus === "pending"; - const isTagPending = taggingStatus === "pending"; - if (isCrawlPending || isTagPending) { - stats.pendingBookmarks += count; - return; - } - - const isCrawlSuccessfulOrNotRequired = - crawlStatus === "success" || crawlStatus === null; - const isTagSuccessfulOrUnknown = - taggingStatus === "success" || taggingStatus === null; - - if (isCrawlSuccessfulOrNotRequired && isTagSuccessfulOrUnknown) { - stats.completedBookmarks += count; - } else { - // Fallback to pending to avoid leaving imports unclassified - stats.pendingBookmarks += count; + statusCounts.forEach(({ status, count: itemCount }) => { + stats.totalBookmarks += itemCount; + + switch (status) { + case "pending": + stats.pendingBookmarks += itemCount; + break; + case "processing": + stats.processingBookmarks += itemCount; + break; + case "completed": + stats.completedBookmarks += itemCount; + break; + case "failed": + stats.failedBookmarks += itemCount; + break; } }); return { ...this.session, - status: stats.pendingBookmarks > 0 ? "in_progress" : "completed", ...stats, }; } @@ -179,4 +139,92 @@ export class ImportSession { }); } } + + async stageBookmarks( + bookmarks: { + type: "link" | "text" | "asset"; + url?: string; + title?: string; + content?: string; + note?: string; + tags: string[]; + listIds: string[]; + sourceAddedAt?: Date; + }[], + ): Promise<void> { + if (this.session.status !== "staging") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not in staging status", + }); + } + + // Filter out invalid bookmarks (link without url, text without content) + const validBookmarks = bookmarks.filter((bookmark) => { + if (bookmark.type === "link" && !bookmark.url) return false; + if (bookmark.type === "text" && !bookmark.content) return false; + return true; + }); + + if (validBookmarks.length === 0) { + return; + } + + await this.ctx.db.insert(importStagingBookmarks).values( + validBookmarks.map((bookmark) => ({ + importSessionId: this.session.id, + type: bookmark.type, + url: bookmark.url, + title: bookmark.title, + content: bookmark.content, + note: bookmark.note, + tags: bookmark.tags, + listIds: bookmark.listIds, + sourceAddedAt: bookmark.sourceAddedAt, + status: "pending" as const, + })), + ); + } + + async finalize(): Promise<void> { + if (this.session.status !== "staging") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not in staging status", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "pending" }) + .where(eq(importSessions.id, this.session.id)); + } + + async pause(): Promise<void> { + if (!["pending", "running"].includes(this.session.status)) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session cannot be paused in current status", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "paused" }) + .where(eq(importSessions.id, this.session.id)); + } + + async resume(): Promise<void> { + if (this.session.status !== "paused") { + throw new TRPCError({ + code: "BAD_REQUEST", + message: "Session not paused", + }); + } + + await this.ctx.db + .update(importSessions) + .set({ status: "pending" }) + .where(eq(importSessions.id, this.session.id)); + } } |
