aboutsummaryrefslogtreecommitdiffstats
path: root/packages/trpc/models
diff options
context:
space:
mode:
Diffstat (limited to 'packages/trpc/models')
-rw-r--r--packages/trpc/models/importSessions.ts174
1 files changed, 111 insertions, 63 deletions
diff --git a/packages/trpc/models/importSessions.ts b/packages/trpc/models/importSessions.ts
index c324cf7f..ee0eb5b2 100644
--- a/packages/trpc/models/importSessions.ts
+++ b/packages/trpc/models/importSessions.ts
@@ -2,12 +2,7 @@ import { TRPCError } from "@trpc/server";
import { and, count, eq } from "drizzle-orm";
import { z } from "zod";
-import {
- bookmarkLinks,
- bookmarks,
- importSessionBookmarks,
- importSessions,
-} from "@karakeep/db/schema";
+import { importSessions, importStagingBookmarks } from "@karakeep/db/schema";
import {
zCreateImportSessionRequestSchema,
ZImportSession,
@@ -81,38 +76,17 @@ export class ImportSession {
);
}
- async attachBookmark(bookmarkId: string): Promise<void> {
- await this.ctx.db.insert(importSessionBookmarks).values({
- importSessionId: this.session.id,
- bookmarkId,
- });
- }
-
async getWithStats(): Promise<ZImportSessionWithStats> {
- // Get bookmark counts by status
+ // Count by staging status - this now reflects the true state since
+ // items stay in "processing" until downstream crawl/tag is complete
const statusCounts = await this.ctx.db
.select({
- crawlStatus: bookmarkLinks.crawlStatus,
- taggingStatus: bookmarks.taggingStatus,
+ status: importStagingBookmarks.status,
count: count(),
})
- .from(importSessionBookmarks)
- .innerJoin(
- importSessions,
- eq(importSessions.id, importSessionBookmarks.importSessionId),
- )
- .leftJoin(bookmarks, eq(bookmarks.id, importSessionBookmarks.bookmarkId))
- .leftJoin(
- bookmarkLinks,
- eq(bookmarkLinks.id, importSessionBookmarks.bookmarkId),
- )
- .where(
- and(
- eq(importSessionBookmarks.importSessionId, this.session.id),
- eq(importSessions.userId, this.ctx.user.id),
- ),
- )
- .groupBy(bookmarkLinks.crawlStatus, bookmarks.taggingStatus);
+ .from(importStagingBookmarks)
+ .where(eq(importStagingBookmarks.importSessionId, this.session.id))
+ .groupBy(importStagingBookmarks.status);
const stats = {
totalBookmarks: 0,
@@ -122,41 +96,27 @@ export class ImportSession {
processingBookmarks: 0,
};
- statusCounts.forEach((statusCount) => {
- const { crawlStatus, taggingStatus, count } = statusCount;
-
- stats.totalBookmarks += count;
-
- const isCrawlFailure = crawlStatus === "failure";
- const isTagFailure = taggingStatus === "failure";
- if (isCrawlFailure || isTagFailure) {
- stats.failedBookmarks += count;
- return;
- }
-
- const isCrawlPending = crawlStatus === "pending";
- const isTagPending = taggingStatus === "pending";
- if (isCrawlPending || isTagPending) {
- stats.pendingBookmarks += count;
- return;
- }
-
- const isCrawlSuccessfulOrNotRequired =
- crawlStatus === "success" || crawlStatus === null;
- const isTagSuccessfulOrUnknown =
- taggingStatus === "success" || taggingStatus === null;
-
- if (isCrawlSuccessfulOrNotRequired && isTagSuccessfulOrUnknown) {
- stats.completedBookmarks += count;
- } else {
- // Fallback to pending to avoid leaving imports unclassified
- stats.pendingBookmarks += count;
+ statusCounts.forEach(({ status, count: itemCount }) => {
+ stats.totalBookmarks += itemCount;
+
+ switch (status) {
+ case "pending":
+ stats.pendingBookmarks += itemCount;
+ break;
+ case "processing":
+ stats.processingBookmarks += itemCount;
+ break;
+ case "completed":
+ stats.completedBookmarks += itemCount;
+ break;
+ case "failed":
+ stats.failedBookmarks += itemCount;
+ break;
}
});
return {
...this.session,
- status: stats.pendingBookmarks > 0 ? "in_progress" : "completed",
...stats,
};
}
@@ -179,4 +139,92 @@ export class ImportSession {
});
}
}
+
+ async stageBookmarks(
+ bookmarks: {
+ type: "link" | "text" | "asset";
+ url?: string;
+ title?: string;
+ content?: string;
+ note?: string;
+ tags: string[];
+ listIds: string[];
+ sourceAddedAt?: Date;
+ }[],
+ ): Promise<void> {
+ if (this.session.status !== "staging") {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session not in staging status",
+ });
+ }
+
+ // Filter out invalid bookmarks (link without url, text without content)
+ const validBookmarks = bookmarks.filter((bookmark) => {
+ if (bookmark.type === "link" && !bookmark.url) return false;
+ if (bookmark.type === "text" && !bookmark.content) return false;
+ return true;
+ });
+
+ if (validBookmarks.length === 0) {
+ return;
+ }
+
+ await this.ctx.db.insert(importStagingBookmarks).values(
+ validBookmarks.map((bookmark) => ({
+ importSessionId: this.session.id,
+ type: bookmark.type,
+ url: bookmark.url,
+ title: bookmark.title,
+ content: bookmark.content,
+ note: bookmark.note,
+ tags: bookmark.tags,
+ listIds: bookmark.listIds,
+ sourceAddedAt: bookmark.sourceAddedAt,
+ status: "pending" as const,
+ })),
+ );
+ }
+
+ async finalize(): Promise<void> {
+ if (this.session.status !== "staging") {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session not in staging status",
+ });
+ }
+
+ await this.ctx.db
+ .update(importSessions)
+ .set({ status: "pending" })
+ .where(eq(importSessions.id, this.session.id));
+ }
+
+ async pause(): Promise<void> {
+ if (!["pending", "running"].includes(this.session.status)) {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session cannot be paused in current status",
+ });
+ }
+
+ await this.ctx.db
+ .update(importSessions)
+ .set({ status: "paused" })
+ .where(eq(importSessions.id, this.session.id));
+ }
+
+ async resume(): Promise<void> {
+ if (this.session.status !== "paused") {
+ throw new TRPCError({
+ code: "BAD_REQUEST",
+ message: "Session not paused",
+ });
+ }
+
+ await this.ctx.db
+ .update(importSessions)
+ .set({ status: "pending" })
+ .where(eq(importSessions.id, this.session.id));
+ }
}