diff options
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/web/components/settings/ImportSessionCard.tsx | 77 | ||||
| -rw-r--r-- | apps/web/lib/hooks/useBookmarkImport.ts | 104 | ||||
| -rw-r--r-- | apps/web/lib/hooks/useImportSessions.ts | 56 | ||||
| -rw-r--r-- | apps/web/lib/i18n/locales/en/translation.json | 11 | ||||
| -rw-r--r-- | apps/workers/index.ts | 15 | ||||
| -rw-r--r-- | apps/workers/workers/importWorker.ts | 565 |
6 files changed, 722 insertions, 106 deletions
diff --git a/apps/web/components/settings/ImportSessionCard.tsx b/apps/web/components/settings/ImportSessionCard.tsx index 690caaa5..f20710ca 100644 --- a/apps/web/components/settings/ImportSessionCard.tsx +++ b/apps/web/components/settings/ImportSessionCard.tsx @@ -9,6 +9,8 @@ import { Progress } from "@/components/ui/progress"; import { useDeleteImportSession, useImportSessionStats, + usePauseImportSession, + useResumeImportSession, } from "@/lib/hooks/useImportSessions"; import { useTranslation } from "@/lib/i18n/client"; import { formatDistanceToNow } from "date-fns"; @@ -19,10 +21,17 @@ import { Clock, ExternalLink, Loader2, + Pause, + Play, Trash2, + Upload, } from "lucide-react"; -import type { ZImportSessionWithStats } from "@karakeep/shared/types/importSessions"; +import type { + ZImportSessionStatus, + ZImportSessionWithStats, +} from "@karakeep/shared/types/importSessions"; +import { switchCase } from "@karakeep/shared/utils/switch"; interface ImportSessionCardProps { session: ZImportSessionWithStats; @@ -30,10 +39,14 @@ interface ImportSessionCardProps { function getStatusColor(status: string) { switch (status) { + case "staging": + return "bg-purple-500/10 text-purple-700 dark:text-purple-400"; case "pending": return "bg-muted text-muted-foreground"; - case "in_progress": + case "running": return "bg-blue-500/10 text-blue-700 dark:text-blue-400"; + case "paused": + return "bg-yellow-500/10 text-yellow-700 dark:text-yellow-400"; case "completed": return "bg-green-500/10 text-green-700 dark:text-green-400"; case "failed": @@ -45,10 +58,14 @@ function getStatusColor(status: string) { function getStatusIcon(status: string) { switch (status) { + case "staging": + return <Upload className="h-4 w-4" />; case "pending": return <Clock className="h-4 w-4" />; - case "in_progress": + case "running": return <Loader2 className="h-4 w-4 animate-spin" />; + case "paused": + return <Pause className="h-4 w-4" />; case "completed": return <CheckCircle2 className="h-4 w-4" />; case "failed": @@ -62,13 +79,18 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) { const { t } = useTranslation(); const { data: liveStats } = useImportSessionStats(session.id); const deleteSession = useDeleteImportSession(); + const pauseSession = usePauseImportSession(); + const resumeSession = useResumeImportSession(); - const statusLabels: Record<string, string> = { - pending: t("settings.import_sessions.status.pending"), - in_progress: t("settings.import_sessions.status.in_progress"), - completed: t("settings.import_sessions.status.completed"), - failed: t("settings.import_sessions.status.failed"), - }; + const statusLabels = (s: ZImportSessionStatus) => + switchCase(s, { + staging: t("settings.import_sessions.status.staging"), + pending: t("settings.import_sessions.status.pending"), + running: t("settings.import_sessions.status.running"), + paused: t("settings.import_sessions.status.paused"), + completed: t("settings.import_sessions.status.completed"), + failed: t("settings.import_sessions.status.failed"), + }); // Use live stats if available, otherwise fallback to session stats const stats = liveStats || session; @@ -79,7 +101,14 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) { 100 : 0; - const canDelete = stats.status !== "in_progress"; + const canDelete = + stats.status === "completed" || + stats.status === "failed" || + stats.status === "paused"; + + const canPause = stats.status === "pending" || stats.status === "running"; + + const canResume = stats.status === "paused"; return ( <Card className="transition-all hover:shadow-md"> @@ -101,7 +130,7 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) { > {getStatusIcon(stats.status)} <span className="ml-1 capitalize"> - {statusLabels[stats.status] ?? stats.status.replace("_", " ")} + {statusLabels(stats.status)} </span> </Badge> </div> @@ -213,6 +242,32 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) { {/* Actions */} <div className="flex items-center justify-end pt-2"> <div className="flex items-center gap-2"> + {canPause && ( + <Button + variant="outline" + size="sm" + onClick={() => + pauseSession.mutate({ importSessionId: session.id }) + } + disabled={pauseSession.isPending} + > + <Pause className="mr-1 h-4 w-4" /> + {t("settings.import_sessions.pause_session")} + </Button> + )} + {canResume && ( + <Button + variant="outline" + size="sm" + onClick={() => + resumeSession.mutate({ importSessionId: session.id }) + } + disabled={resumeSession.isPending} + > + <Play className="mr-1 h-4 w-4" /> + {t("settings.import_sessions.resume_session")} + </Button> + )} {canDelete && ( <ActionConfirmingDialog title={t("settings.import_sessions.delete_dialog_title")} diff --git a/apps/web/lib/hooks/useBookmarkImport.ts b/apps/web/lib/hooks/useBookmarkImport.ts index c0681924..35c04c1b 100644 --- a/apps/web/lib/hooks/useBookmarkImport.ts +++ b/apps/web/lib/hooks/useBookmarkImport.ts @@ -5,25 +5,13 @@ import { toast } from "@/components/ui/sonner"; import { useTranslation } from "@/lib/i18n/client"; import { useMutation, useQueryClient } from "@tanstack/react-query"; -import { - useCreateBookmarkWithPostHook, - useUpdateBookmarkTags, -} from "@karakeep/shared-react/hooks/bookmarks"; -import { - useAddBookmarkToList, - useCreateBookmarkList, -} from "@karakeep/shared-react/hooks/lists"; +import { useCreateBookmarkList } from "@karakeep/shared-react/hooks/lists"; import { useTRPC } from "@karakeep/shared-react/trpc"; import { importBookmarksFromFile, ImportSource, - ParsedBookmark, parseImportFile, } from "@karakeep/shared/import-export"; -import { - BookmarkTypes, - MAX_BOOKMARK_TITLE_LENGTH, -} from "@karakeep/shared/types/bookmarks"; import { useCreateImportSession } from "./useImportSessions"; @@ -43,10 +31,13 @@ export function useBookmarkImport() { const queryClient = useQueryClient(); const { mutateAsync: createImportSession } = useCreateImportSession(); - const { mutateAsync: createBookmark } = useCreateBookmarkWithPostHook(); const { mutateAsync: createList } = useCreateBookmarkList(); - const { mutateAsync: addToList } = useAddBookmarkToList(); - const { mutateAsync: updateTags } = useUpdateBookmarkTags(); + const { mutateAsync: stageImportedBookmarks } = useMutation( + api.importSessions.stageImportedBookmarks.mutationOptions(), + ); + const { mutateAsync: finalizeImportStaging } = useMutation( + api.importSessions.finalizeImportStaging.mutationOptions(), + ); const uploadBookmarkFileMutation = useMutation({ mutationFn: async ({ @@ -86,7 +77,6 @@ export function useBookmarkImport() { } // Proceed with import if quota check passes - // Use a custom parser to avoid re-parsing the file const result = await importBookmarksFromFile( { file, @@ -95,65 +85,9 @@ export function useBookmarkImport() { deps: { createImportSession, createList, - createBookmark: async ( - bookmark: ParsedBookmark, - sessionId: string, - ) => { - if (bookmark.content === undefined) { - throw new Error("Content is undefined"); - } - const created = await createBookmark({ - crawlPriority: "low", - title: bookmark.title.substring(0, MAX_BOOKMARK_TITLE_LENGTH), - createdAt: bookmark.addDate - ? new Date(bookmark.addDate * 1000) - : undefined, - note: bookmark.notes, - archived: bookmark.archived, - importSessionId: sessionId, - source: "import", - ...(bookmark.content.type === BookmarkTypes.LINK - ? { - type: BookmarkTypes.LINK, - url: bookmark.content.url, - } - : { - type: BookmarkTypes.TEXT, - text: bookmark.content.text, - }), - }); - return created as { id: string; alreadyExists?: boolean }; - }, - addBookmarkToLists: async ({ - bookmarkId, - listIds, - }: { - bookmarkId: string; - listIds: string[]; - }) => { - await Promise.all( - listIds.map((listId) => - addToList({ - bookmarkId, - listId, - }), - ), - ); - }, - updateBookmarkTags: async ({ - bookmarkId, - tags, - }: { - bookmarkId: string; - tags: string[]; - }) => { - if (tags.length > 0) { - await updateTags({ - bookmarkId, - attach: tags.map((t) => ({ tagName: t })), - detach: [], - }); - } + stageImportedBookmarks, + finalizeImportStaging: async (sessionId: string) => { + await finalizeImportStaging({ importSessionId: sessionId }); }, }, onProgress: (done, total) => setImportProgress({ done, total }), @@ -174,19 +108,11 @@ export function useBookmarkImport() { toast({ description: "No bookmarks found in the file." }); return; } - const { successes, failures, alreadyExisted } = result.counts; - if (successes > 0 || alreadyExisted > 0) { - toast({ - description: `Imported ${successes} bookmarks into import session. Background processing will start automatically.`, - variant: "default", - }); - } - if (failures > 0) { - toast({ - description: `Failed to import ${failures} bookmarks. Check console for details.`, - variant: "destructive", - }); - } + + toast({ + description: `Staged ${result.counts.total} bookmarks for import. Background processing will start automatically.`, + variant: "default", + }); }, onError: (error) => { setImportProgress(null); diff --git a/apps/web/lib/hooks/useImportSessions.ts b/apps/web/lib/hooks/useImportSessions.ts index 133bb29b..4d095c0b 100644 --- a/apps/web/lib/hooks/useImportSessions.ts +++ b/apps/web/lib/hooks/useImportSessions.ts @@ -46,7 +46,11 @@ export function useImportSessionStats(importSessionId: string) { importSessionId, }, { - refetchInterval: 5000, // Refetch every 5 seconds to show progress + refetchInterval: (q) => + !q.state.data || + !["completed", "failed"].includes(q.state.data.status) + ? 5000 + : false, // Refetch every 5 seconds to show progress enabled: !!importSessionId, }, ), @@ -77,3 +81,53 @@ export function useDeleteImportSession() { }), ); } + +export function usePauseImportSession() { + const api = useTRPC(); + const queryClient = useQueryClient(); + + return useMutation( + api.importSessions.pauseImportSession.mutationOptions({ + onSuccess: () => { + queryClient.invalidateQueries( + api.importSessions.listImportSessions.pathFilter(), + ); + toast({ + description: "Import session paused", + variant: "default", + }); + }, + onError: (error) => { + toast({ + description: error.message || "Failed to pause import session", + variant: "destructive", + }); + }, + }), + ); +} + +export function useResumeImportSession() { + const api = useTRPC(); + const queryClient = useQueryClient(); + + return useMutation( + api.importSessions.resumeImportSession.mutationOptions({ + onSuccess: () => { + queryClient.invalidateQueries( + api.importSessions.listImportSessions.pathFilter(), + ); + toast({ + description: "Import session resumed", + variant: "default", + }); + }, + onError: (error) => { + toast({ + description: error.message || "Failed to resume import session", + variant: "destructive", + }); + }, + }), + ); +} diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json index ce607920..59d2098e 100644 --- a/apps/web/lib/i18n/locales/en/translation.json +++ b/apps/web/lib/i18n/locales/en/translation.json @@ -416,11 +416,12 @@ "created_at": "Created {{time}}", "progress": "Progress", "status": { + "staging": "Staging", "pending": "Pending", - "in_progress": "In progress", + "running": "Running", + "paused": "Paused", "completed": "Completed", - "failed": "Failed", - "processing": "Processing" + "failed": "Failed" }, "badges": { "pending": "{{count}} pending", @@ -432,7 +433,9 @@ "view_list": "View List", "delete_dialog_title": "Delete Import Session", "delete_dialog_description": "Are you sure you want to delete \"{{name}}\"? This action cannot be undone. The bookmarks themselves will not be deleted.", - "delete_session": "Delete Session" + "delete_session": "Delete Session", + "pause_session": "Pause", + "resume_session": "Resume" }, "backups": { "backups": "Backups", diff --git a/apps/workers/index.ts b/apps/workers/index.ts index dfbac85b..931a505f 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -28,6 +28,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { ImportWorker } from "./workers/importWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; import { RuleEngineWorker } from "./workers/ruleEngineWorker"; import { SearchIndexingWorker } from "./workers/searchWorker"; @@ -77,7 +78,7 @@ const workerBuilders = { }, } as const; -type WorkerName = keyof typeof workerBuilders; +type WorkerName = keyof typeof workerBuilders | "import"; const enabledWorkers = new Set(serverConfig.workers.enabledWorkers); const disabledWorkers = new Set(serverConfig.workers.disabledWorkers); @@ -118,10 +119,19 @@ async function main() { BackupSchedulingWorker.start(); } + // Start import polling worker + let importWorker: ImportWorker | null = null; + let importWorkerPromise: Promise<void> | null = null; + if (isWorkerEnabled("import")) { + importWorker = new ImportWorker(); + importWorkerPromise = importWorker.start(); + } + await Promise.any([ Promise.all([ ...workers.map(({ worker }) => worker.run()), httpServer.serve(), + ...(importWorkerPromise ? [importWorkerPromise] : []), ]), shutdownPromise, ]); @@ -136,6 +146,9 @@ async function main() { if (workers.some((w) => w.name === "backup")) { BackupSchedulingWorker.stop(); } + if (importWorker) { + importWorker.stop(); + } for (const { worker } of workers) { worker.stop(); } diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts new file mode 100644 index 00000000..a717fd9d --- /dev/null +++ b/apps/workers/workers/importWorker.ts @@ -0,0 +1,565 @@ +import { + and, + count, + eq, + inArray, + isNotNull, + isNull, + lt, + or, +} from "drizzle-orm"; +import { Counter, Gauge, Histogram } from "prom-client"; +import { buildImpersonatingTRPCClient } from "trpc"; + +import { db } from "@karakeep/db"; +import { + bookmarkLinks, + bookmarks, + importSessions, + importStagingBookmarks, +} from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +// Prometheus metrics +const importStagingProcessedCounter = new Counter({ + name: "import_staging_processed_total", + help: "Total number of staged items processed", + labelNames: ["result"], +}); + +const importStagingStaleResetCounter = new Counter({ + name: "import_staging_stale_reset_total", + help: "Total number of stale processing items reset to pending", +}); + +const importStagingInFlightGauge = new Gauge({ + name: "import_staging_in_flight", + help: "Current number of in-flight items (processing + recently completed)", +}); + +const importSessionsGauge = new Gauge({ + name: "import_sessions_active", + help: "Number of active import sessions by status", + labelNames: ["status"], +}); + +const importStagingPendingGauge = new Gauge({ + name: "import_staging_pending_total", + help: "Total number of pending items in staging table", +}); + +const importBatchDurationHistogram = new Histogram({ + name: "import_batch_duration_seconds", + help: "Time taken to process a batch of staged items", + buckets: [0.1, 0.5, 1, 2, 5, 10, 30], +}); + +function sleep(ms: number): Promise<void> { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +export class ImportWorker { + private running = false; + private pollIntervalMs = 1000; + + // Backpressure settings + private maxInFlight = 50; + private batchSize = 10; + private staleThresholdMs = 60 * 60 * 1000; // 1 hour + + async start() { + this.running = true; + let iterationCount = 0; + + logger.info("[import] Starting import polling worker"); + + while (this.running) { + try { + // Periodically reset stale processing items (every 60 iterations ~= 1 min) + if (iterationCount % 60 === 0) { + await this.resetStaleProcessingItems(); + } + iterationCount++; + + // Check if any processing items have completed downstream work + await this.checkAndCompleteProcessingItems(); + + const processed = await this.processBatch(); + if (processed === 0) { + await this.checkAndCompleteIdleSessions(); + // Nothing to do, wait before polling again + await sleep(this.pollIntervalMs); + } + } catch (error) { + logger.error(`[import] Error in polling loop: ${error}`); + await sleep(this.pollIntervalMs); + } + } + } + + stop() { + logger.info("[import] Stopping import polling worker"); + this.running = false; + } + + private async processBatch(): Promise<number> { + // 1. Check backpressure - count in-flight + recently completed items + const availableCapacity = await this.getAvailableCapacity(); + importStagingInFlightGauge.set(this.maxInFlight - availableCapacity); + + if (availableCapacity <= 0) { + // At capacity, wait before trying again + return 0; + } + + // 2. Get candidate IDs with fair scheduling across users + const batchLimit = Math.min(this.batchSize, availableCapacity); + const candidateIds = await this.getNextBatchFairly(batchLimit); + + if (candidateIds.length === 0) return 0; + + // 3. Atomically claim rows - only rows still pending will be claimed + // This prevents race conditions where multiple workers select the same rows + const batch = await db + .update(importStagingBookmarks) + .set({ status: "processing", processingStartedAt: new Date() }) + .where( + and( + eq(importStagingBookmarks.status, "pending"), + inArray(importStagingBookmarks.id, candidateIds), + ), + ) + .returning(); + + // If no rows were claimed (another worker got them first), skip processing + if (batch.length === 0) return 0; + + const batchTimer = importBatchDurationHistogram.startTimer(); + + // 4. Mark session(s) as running (using claimed rows, not candidates) + const sessionIds = [...new Set(batch.map((b) => b.importSessionId))]; + await db + .update(importSessions) + .set({ status: "running" }) + .where( + and( + inArray(importSessions.id, sessionIds), + eq(importSessions.status, "pending"), + ), + ); + + // 5. Process in parallel + await Promise.allSettled( + batch.map((staged) => this.processOneBookmark(staged)), + ); + + // 6. Check if any sessions are now complete + await this.checkAndCompleteEmptySessions(sessionIds); + + batchTimer(); // Record batch duration + await this.updateGauges(); // Update pending/session gauges + + return batch.length; + } + + private async updateGauges() { + // Update pending items gauge + const pending = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.status, "pending")); + importStagingPendingGauge.set(pending[0]?.count ?? 0); + + // Update active sessions gauge by status + const sessions = await db + .select({ + status: importSessions.status, + count: count(), + }) + .from(importSessions) + .where( + inArray(importSessions.status, [ + "staging", + "pending", + "running", + "paused", + ]), + ) + .groupBy(importSessions.status); + + // Reset all status gauges to 0 first + for (const status of ["staging", "pending", "running", "paused"]) { + importSessionsGauge.set({ status }, 0); + } + + // Set actual values + for (const s of sessions) { + importSessionsGauge.set({ status: s.status }, s.count); + } + } + + private async checkAndCompleteIdleSessions() { + const sessions = await db + .select({ id: importSessions.id }) + .from(importSessions) + .where(inArray(importSessions.status, ["pending", "running"])); + + const sessionIds = sessions.map((session) => session.id); + if (sessionIds.length === 0) { + return; + } + + await this.checkAndCompleteEmptySessions(sessionIds); + } + + private async getNextBatchFairly(limit: number): Promise<string[]> { + // Query pending item IDs from active sessions, ordered by: + // 1. User's last-served timestamp (fairness) + // 2. Staging item creation time (FIFO within user) + // Returns only IDs - actual rows will be fetched atomically during claim + const results = await db + .select({ + id: importStagingBookmarks.id, + }) + .from(importStagingBookmarks) + .innerJoin( + importSessions, + eq(importStagingBookmarks.importSessionId, importSessions.id), + ) + .where( + and( + eq(importStagingBookmarks.status, "pending"), + inArray(importSessions.status, ["pending", "running"]), + ), + ) + .orderBy(importSessions.lastProcessedAt, importStagingBookmarks.createdAt) + .limit(limit); + + return results.map((r) => r.id); + } + + private async attachBookmarkToLists( + caller: Awaited<ReturnType<typeof buildImpersonatingTRPCClient>>, + session: typeof importSessions.$inferSelect, + staged: typeof importStagingBookmarks.$inferSelect, + bookmarkId: string, + ): Promise<void> { + const listIds = new Set<string>(); + + if (session.rootListId) { + listIds.add(session.rootListId); + } + + if (staged.listIds && staged.listIds.length > 0) { + for (const listId of staged.listIds) { + listIds.add(listId); + } + } + + for (const listId of listIds) { + try { + await caller.lists.addToList({ listId, bookmarkId }); + } catch (error) { + logger.warn( + `[import] Failed to add bookmark ${bookmarkId} to list ${listId}: ${error}`, + ); + } + } + } + + private async processOneBookmark( + staged: typeof importStagingBookmarks.$inferSelect, + ) { + const session = await db.query.importSessions.findFirst({ + where: eq(importSessions.id, staged.importSessionId), + }); + + if (!session || session.status === "paused") { + // Session paused mid-batch, reset item to pending + await db + .update(importStagingBookmarks) + .set({ status: "pending" }) + .where(eq(importStagingBookmarks.id, staged.id)); + return; + } + + try { + // Use existing tRPC mutation via internal caller + // Note: Duplicate detection is handled by createBookmark itself + const caller = await buildImpersonatingTRPCClient(session.userId); + + // Build the request based on bookmark type + type CreateBookmarkInput = Parameters< + typeof caller.bookmarks.createBookmark + >[0]; + + const baseRequest = { + title: staged.title ?? undefined, + note: staged.note ?? undefined, + createdAt: staged.sourceAddedAt ?? undefined, + crawlPriority: "low" as const, + }; + + let bookmarkRequest: CreateBookmarkInput; + + if (staged.type === "link") { + if (!staged.url) { + throw new Error("URL is required for link bookmarks"); + } + bookmarkRequest = { + ...baseRequest, + type: BookmarkTypes.LINK, + url: staged.url, + }; + } else if (staged.type === "text") { + if (!staged.content) { + throw new Error("Content is required for text bookmarks"); + } + bookmarkRequest = { + ...baseRequest, + type: BookmarkTypes.TEXT, + text: staged.content, + }; + } else { + // asset type - skip for now as it needs special handling + logger.warn( + `[import] Asset bookmarks not yet supported in import worker: ${staged.id}`, + ); + await db + .update(importStagingBookmarks) + .set({ + status: "failed", + result: "rejected", + resultReason: "Asset bookmarks not yet supported", + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, staged.id)); + await this.updateSessionLastProcessedAt(staged.importSessionId); + return; + } + + const result = await caller.bookmarks.createBookmark(bookmarkRequest); + + // Apply tags via existing mutation (for both new and duplicate bookmarks) + if (staged.tags && staged.tags.length > 0) { + await caller.bookmarks.updateTags({ + bookmarkId: result.id, + attach: staged.tags.map((t) => ({ tagName: t })), + detach: [], + }); + } + + // Handle duplicate case (createBookmark returns alreadyExists: true) + if (result.alreadyExists) { + await db + .update(importStagingBookmarks) + .set({ + status: "completed", + result: "skipped_duplicate", + resultReason: "URL already exists", + resultBookmarkId: result.id, + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, staged.id)); + + importStagingProcessedCounter.inc({ result: "skipped_duplicate" }); + await this.attachBookmarkToLists(caller, session, staged, result.id); + await this.updateSessionLastProcessedAt(staged.importSessionId); + return; + } + + // Mark as accepted but keep in "processing" until crawl/tag is done + // The item will be moved to "completed" by checkAndCompleteProcessingItems() + await db + .update(importStagingBookmarks) + .set({ + result: "accepted", + resultBookmarkId: result.id, + }) + .where(eq(importStagingBookmarks.id, staged.id)); + + await this.attachBookmarkToLists(caller, session, staged, result.id); + + await this.updateSessionLastProcessedAt(staged.importSessionId); + } catch (error) { + logger.error( + `[import] Error processing staged item ${staged.id}: ${error}`, + ); + await db + .update(importStagingBookmarks) + .set({ + status: "failed", + result: "rejected", + resultReason: String(error), + completedAt: new Date(), + }) + .where(eq(importStagingBookmarks.id, staged.id)); + + importStagingProcessedCounter.inc({ result: "rejected" }); + await this.updateSessionLastProcessedAt(staged.importSessionId); + } + } + + private async updateSessionLastProcessedAt(sessionId: string) { + await db + .update(importSessions) + .set({ lastProcessedAt: new Date() }) + .where(eq(importSessions.id, sessionId)); + } + + private async checkAndCompleteEmptySessions(sessionIds: string[]) { + for (const sessionId of sessionIds) { + const remaining = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.importSessionId, sessionId), + inArray(importStagingBookmarks.status, ["pending", "processing"]), + ), + ); + + if (remaining[0]?.count === 0) { + await db + .update(importSessions) + .set({ status: "completed" }) + .where(eq(importSessions.id, sessionId)); + } + } + } + + /** + * Check processing items that have a bookmark created and mark them as completed + * once downstream processing (crawling/tagging) is done. + */ + private async checkAndCompleteProcessingItems(): Promise<number> { + // Find processing items where: + // - A bookmark was created (resultBookmarkId is set) + // - Downstream processing is complete (crawl/tag not pending) + const completedItems = await db + .select({ + id: importStagingBookmarks.id, + importSessionId: importStagingBookmarks.importSessionId, + }) + .from(importStagingBookmarks) + .leftJoin( + bookmarks, + eq(bookmarks.id, importStagingBookmarks.resultBookmarkId), + ) + .leftJoin( + bookmarkLinks, + eq(bookmarkLinks.id, importStagingBookmarks.resultBookmarkId), + ) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + isNotNull(importStagingBookmarks.resultBookmarkId), + // Crawl is done (not pending) - either success, failure, or null (not a link) + or( + isNull(bookmarkLinks.crawlStatus), + eq(bookmarkLinks.crawlStatus, "success"), + eq(bookmarkLinks.crawlStatus, "failure"), + ), + // Tagging is done (not pending) - either success, failure, or null + or( + isNull(bookmarks.taggingStatus), + eq(bookmarks.taggingStatus, "success"), + eq(bookmarks.taggingStatus, "failure"), + ), + ), + ); + + if (completedItems.length === 0) { + return 0; + } + + // Mark them as completed + await db + .update(importStagingBookmarks) + .set({ + status: "completed", + completedAt: new Date(), + }) + .where( + inArray( + importStagingBookmarks.id, + completedItems.map((i) => i.id), + ), + ); + + // Increment counter for completed items + importStagingProcessedCounter.inc( + { result: "accepted" }, + completedItems.length, + ); + + // Check if any sessions are now complete + const sessionIds = [ + ...new Set(completedItems.map((i) => i.importSessionId)), + ]; + await this.checkAndCompleteEmptySessions(sessionIds); + + return completedItems.length; + } + + /** + * Backpressure: Calculate available capacity. + * Counts items currently in "processing" status, which includes both: + * - Items being actively imported + * - Items waiting for downstream crawl/tag to complete + */ + private async getAvailableCapacity(): Promise<number> { + const processing = await db + .select({ count: count() }) + .from(importStagingBookmarks) + .where(eq(importStagingBookmarks.status, "processing")); + + return this.maxInFlight - (processing[0]?.count ?? 0); + } + + /** + * Reset stale "processing" items back to "pending" so they can be retried. + * Called periodically to handle crashed workers or stuck items. + * + * Only resets items that don't have a resultBookmarkId - those with a bookmark + * are waiting for downstream processing (crawl/tag), not stale. + */ + private async resetStaleProcessingItems(): Promise<number> { + const staleThreshold = new Date(Date.now() - this.staleThresholdMs); + + const staleItems = await db + .select({ id: importStagingBookmarks.id }) + .from(importStagingBookmarks) + .where( + and( + eq(importStagingBookmarks.status, "processing"), + lt(importStagingBookmarks.processingStartedAt, staleThreshold), + // Only reset items that haven't created a bookmark yet + // Items with a bookmark are waiting for downstream, not stale + isNull(importStagingBookmarks.resultBookmarkId), + ), + ); + + if (staleItems.length > 0) { + logger.warn( + `[import] Resetting ${staleItems.length} stale processing items`, + ); + + await db + .update(importStagingBookmarks) + .set({ status: "pending", processingStartedAt: null }) + .where( + inArray( + importStagingBookmarks.id, + staleItems.map((i) => i.id), + ), + ); + + importStagingStaleResetCounter.inc(staleItems.length); + return staleItems.length; + } + + return 0; + } +} |
