aboutsummaryrefslogtreecommitdiffstats
path: root/apps
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2026-02-04 09:44:18 +0000
committerGitHub <noreply@github.com>2026-02-04 09:44:18 +0000
commit3c838ddb26c1e86d3f201ce71f13c834be705f69 (patch)
tree892fe4f8cd2ca01d6e4cd34f677fc16aa2fd63f6 /apps
parent3fcccb858ee3ef22fe9ce479af4ce458ac9a0fe1 (diff)
downloadkarakeep-3c838ddb26c1e86d3f201ce71f13c834be705f69.tar.zst
feat: Import workflow v3 (#2378)
* feat: import workflow v3 * batch stage * revert migration * cleanups * pr comments * move to models * add allowed workers * e2e tests * import list ids * add missing indicies * merge test * more fixes * add resume/pause to UI * fix ui states * fix tests * simplify progress tracking * remove backpressure * fix list imports * fix race on claiming bookmarks * remove the codex file
Diffstat (limited to 'apps')
-rw-r--r--apps/web/components/settings/ImportSessionCard.tsx77
-rw-r--r--apps/web/lib/hooks/useBookmarkImport.ts104
-rw-r--r--apps/web/lib/hooks/useImportSessions.ts56
-rw-r--r--apps/web/lib/i18n/locales/en/translation.json11
-rw-r--r--apps/workers/index.ts15
-rw-r--r--apps/workers/workers/importWorker.ts565
6 files changed, 722 insertions, 106 deletions
diff --git a/apps/web/components/settings/ImportSessionCard.tsx b/apps/web/components/settings/ImportSessionCard.tsx
index 690caaa5..f20710ca 100644
--- a/apps/web/components/settings/ImportSessionCard.tsx
+++ b/apps/web/components/settings/ImportSessionCard.tsx
@@ -9,6 +9,8 @@ import { Progress } from "@/components/ui/progress";
import {
useDeleteImportSession,
useImportSessionStats,
+ usePauseImportSession,
+ useResumeImportSession,
} from "@/lib/hooks/useImportSessions";
import { useTranslation } from "@/lib/i18n/client";
import { formatDistanceToNow } from "date-fns";
@@ -19,10 +21,17 @@ import {
Clock,
ExternalLink,
Loader2,
+ Pause,
+ Play,
Trash2,
+ Upload,
} from "lucide-react";
-import type { ZImportSessionWithStats } from "@karakeep/shared/types/importSessions";
+import type {
+ ZImportSessionStatus,
+ ZImportSessionWithStats,
+} from "@karakeep/shared/types/importSessions";
+import { switchCase } from "@karakeep/shared/utils/switch";
interface ImportSessionCardProps {
session: ZImportSessionWithStats;
@@ -30,10 +39,14 @@ interface ImportSessionCardProps {
function getStatusColor(status: string) {
switch (status) {
+ case "staging":
+ return "bg-purple-500/10 text-purple-700 dark:text-purple-400";
case "pending":
return "bg-muted text-muted-foreground";
- case "in_progress":
+ case "running":
return "bg-blue-500/10 text-blue-700 dark:text-blue-400";
+ case "paused":
+ return "bg-yellow-500/10 text-yellow-700 dark:text-yellow-400";
case "completed":
return "bg-green-500/10 text-green-700 dark:text-green-400";
case "failed":
@@ -45,10 +58,14 @@ function getStatusColor(status: string) {
function getStatusIcon(status: string) {
switch (status) {
+ case "staging":
+ return <Upload className="h-4 w-4" />;
case "pending":
return <Clock className="h-4 w-4" />;
- case "in_progress":
+ case "running":
return <Loader2 className="h-4 w-4 animate-spin" />;
+ case "paused":
+ return <Pause className="h-4 w-4" />;
case "completed":
return <CheckCircle2 className="h-4 w-4" />;
case "failed":
@@ -62,13 +79,18 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) {
const { t } = useTranslation();
const { data: liveStats } = useImportSessionStats(session.id);
const deleteSession = useDeleteImportSession();
+ const pauseSession = usePauseImportSession();
+ const resumeSession = useResumeImportSession();
- const statusLabels: Record<string, string> = {
- pending: t("settings.import_sessions.status.pending"),
- in_progress: t("settings.import_sessions.status.in_progress"),
- completed: t("settings.import_sessions.status.completed"),
- failed: t("settings.import_sessions.status.failed"),
- };
+ const statusLabels = (s: ZImportSessionStatus) =>
+ switchCase(s, {
+ staging: t("settings.import_sessions.status.staging"),
+ pending: t("settings.import_sessions.status.pending"),
+ running: t("settings.import_sessions.status.running"),
+ paused: t("settings.import_sessions.status.paused"),
+ completed: t("settings.import_sessions.status.completed"),
+ failed: t("settings.import_sessions.status.failed"),
+ });
// Use live stats if available, otherwise fallback to session stats
const stats = liveStats || session;
@@ -79,7 +101,14 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) {
100
: 0;
- const canDelete = stats.status !== "in_progress";
+ const canDelete =
+ stats.status === "completed" ||
+ stats.status === "failed" ||
+ stats.status === "paused";
+
+ const canPause = stats.status === "pending" || stats.status === "running";
+
+ const canResume = stats.status === "paused";
return (
<Card className="transition-all hover:shadow-md">
@@ -101,7 +130,7 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) {
>
{getStatusIcon(stats.status)}
<span className="ml-1 capitalize">
- {statusLabels[stats.status] ?? stats.status.replace("_", " ")}
+ {statusLabels(stats.status)}
</span>
</Badge>
</div>
@@ -213,6 +242,32 @@ export function ImportSessionCard({ session }: ImportSessionCardProps) {
{/* Actions */}
<div className="flex items-center justify-end pt-2">
<div className="flex items-center gap-2">
+ {canPause && (
+ <Button
+ variant="outline"
+ size="sm"
+ onClick={() =>
+ pauseSession.mutate({ importSessionId: session.id })
+ }
+ disabled={pauseSession.isPending}
+ >
+ <Pause className="mr-1 h-4 w-4" />
+ {t("settings.import_sessions.pause_session")}
+ </Button>
+ )}
+ {canResume && (
+ <Button
+ variant="outline"
+ size="sm"
+ onClick={() =>
+ resumeSession.mutate({ importSessionId: session.id })
+ }
+ disabled={resumeSession.isPending}
+ >
+ <Play className="mr-1 h-4 w-4" />
+ {t("settings.import_sessions.resume_session")}
+ </Button>
+ )}
{canDelete && (
<ActionConfirmingDialog
title={t("settings.import_sessions.delete_dialog_title")}
diff --git a/apps/web/lib/hooks/useBookmarkImport.ts b/apps/web/lib/hooks/useBookmarkImport.ts
index c0681924..35c04c1b 100644
--- a/apps/web/lib/hooks/useBookmarkImport.ts
+++ b/apps/web/lib/hooks/useBookmarkImport.ts
@@ -5,25 +5,13 @@ import { toast } from "@/components/ui/sonner";
import { useTranslation } from "@/lib/i18n/client";
import { useMutation, useQueryClient } from "@tanstack/react-query";
-import {
- useCreateBookmarkWithPostHook,
- useUpdateBookmarkTags,
-} from "@karakeep/shared-react/hooks/bookmarks";
-import {
- useAddBookmarkToList,
- useCreateBookmarkList,
-} from "@karakeep/shared-react/hooks/lists";
+import { useCreateBookmarkList } from "@karakeep/shared-react/hooks/lists";
import { useTRPC } from "@karakeep/shared-react/trpc";
import {
importBookmarksFromFile,
ImportSource,
- ParsedBookmark,
parseImportFile,
} from "@karakeep/shared/import-export";
-import {
- BookmarkTypes,
- MAX_BOOKMARK_TITLE_LENGTH,
-} from "@karakeep/shared/types/bookmarks";
import { useCreateImportSession } from "./useImportSessions";
@@ -43,10 +31,13 @@ export function useBookmarkImport() {
const queryClient = useQueryClient();
const { mutateAsync: createImportSession } = useCreateImportSession();
- const { mutateAsync: createBookmark } = useCreateBookmarkWithPostHook();
const { mutateAsync: createList } = useCreateBookmarkList();
- const { mutateAsync: addToList } = useAddBookmarkToList();
- const { mutateAsync: updateTags } = useUpdateBookmarkTags();
+ const { mutateAsync: stageImportedBookmarks } = useMutation(
+ api.importSessions.stageImportedBookmarks.mutationOptions(),
+ );
+ const { mutateAsync: finalizeImportStaging } = useMutation(
+ api.importSessions.finalizeImportStaging.mutationOptions(),
+ );
const uploadBookmarkFileMutation = useMutation({
mutationFn: async ({
@@ -86,7 +77,6 @@ export function useBookmarkImport() {
}
// Proceed with import if quota check passes
- // Use a custom parser to avoid re-parsing the file
const result = await importBookmarksFromFile(
{
file,
@@ -95,65 +85,9 @@ export function useBookmarkImport() {
deps: {
createImportSession,
createList,
- createBookmark: async (
- bookmark: ParsedBookmark,
- sessionId: string,
- ) => {
- if (bookmark.content === undefined) {
- throw new Error("Content is undefined");
- }
- const created = await createBookmark({
- crawlPriority: "low",
- title: bookmark.title.substring(0, MAX_BOOKMARK_TITLE_LENGTH),
- createdAt: bookmark.addDate
- ? new Date(bookmark.addDate * 1000)
- : undefined,
- note: bookmark.notes,
- archived: bookmark.archived,
- importSessionId: sessionId,
- source: "import",
- ...(bookmark.content.type === BookmarkTypes.LINK
- ? {
- type: BookmarkTypes.LINK,
- url: bookmark.content.url,
- }
- : {
- type: BookmarkTypes.TEXT,
- text: bookmark.content.text,
- }),
- });
- return created as { id: string; alreadyExists?: boolean };
- },
- addBookmarkToLists: async ({
- bookmarkId,
- listIds,
- }: {
- bookmarkId: string;
- listIds: string[];
- }) => {
- await Promise.all(
- listIds.map((listId) =>
- addToList({
- bookmarkId,
- listId,
- }),
- ),
- );
- },
- updateBookmarkTags: async ({
- bookmarkId,
- tags,
- }: {
- bookmarkId: string;
- tags: string[];
- }) => {
- if (tags.length > 0) {
- await updateTags({
- bookmarkId,
- attach: tags.map((t) => ({ tagName: t })),
- detach: [],
- });
- }
+ stageImportedBookmarks,
+ finalizeImportStaging: async (sessionId: string) => {
+ await finalizeImportStaging({ importSessionId: sessionId });
},
},
onProgress: (done, total) => setImportProgress({ done, total }),
@@ -174,19 +108,11 @@ export function useBookmarkImport() {
toast({ description: "No bookmarks found in the file." });
return;
}
- const { successes, failures, alreadyExisted } = result.counts;
- if (successes > 0 || alreadyExisted > 0) {
- toast({
- description: `Imported ${successes} bookmarks into import session. Background processing will start automatically.`,
- variant: "default",
- });
- }
- if (failures > 0) {
- toast({
- description: `Failed to import ${failures} bookmarks. Check console for details.`,
- variant: "destructive",
- });
- }
+
+ toast({
+ description: `Staged ${result.counts.total} bookmarks for import. Background processing will start automatically.`,
+ variant: "default",
+ });
},
onError: (error) => {
setImportProgress(null);
diff --git a/apps/web/lib/hooks/useImportSessions.ts b/apps/web/lib/hooks/useImportSessions.ts
index 133bb29b..4d095c0b 100644
--- a/apps/web/lib/hooks/useImportSessions.ts
+++ b/apps/web/lib/hooks/useImportSessions.ts
@@ -46,7 +46,11 @@ export function useImportSessionStats(importSessionId: string) {
importSessionId,
},
{
- refetchInterval: 5000, // Refetch every 5 seconds to show progress
+ refetchInterval: (q) =>
+ !q.state.data ||
+ !["completed", "failed"].includes(q.state.data.status)
+ ? 5000
+ : false, // Refetch every 5 seconds to show progress
enabled: !!importSessionId,
},
),
@@ -77,3 +81,53 @@ export function useDeleteImportSession() {
}),
);
}
+
+export function usePauseImportSession() {
+ const api = useTRPC();
+ const queryClient = useQueryClient();
+
+ return useMutation(
+ api.importSessions.pauseImportSession.mutationOptions({
+ onSuccess: () => {
+ queryClient.invalidateQueries(
+ api.importSessions.listImportSessions.pathFilter(),
+ );
+ toast({
+ description: "Import session paused",
+ variant: "default",
+ });
+ },
+ onError: (error) => {
+ toast({
+ description: error.message || "Failed to pause import session",
+ variant: "destructive",
+ });
+ },
+ }),
+ );
+}
+
+export function useResumeImportSession() {
+ const api = useTRPC();
+ const queryClient = useQueryClient();
+
+ return useMutation(
+ api.importSessions.resumeImportSession.mutationOptions({
+ onSuccess: () => {
+ queryClient.invalidateQueries(
+ api.importSessions.listImportSessions.pathFilter(),
+ );
+ toast({
+ description: "Import session resumed",
+ variant: "default",
+ });
+ },
+ onError: (error) => {
+ toast({
+ description: error.message || "Failed to resume import session",
+ variant: "destructive",
+ });
+ },
+ }),
+ );
+}
diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json
index ce607920..59d2098e 100644
--- a/apps/web/lib/i18n/locales/en/translation.json
+++ b/apps/web/lib/i18n/locales/en/translation.json
@@ -416,11 +416,12 @@
"created_at": "Created {{time}}",
"progress": "Progress",
"status": {
+ "staging": "Staging",
"pending": "Pending",
- "in_progress": "In progress",
+ "running": "Running",
+ "paused": "Paused",
"completed": "Completed",
- "failed": "Failed",
- "processing": "Processing"
+ "failed": "Failed"
},
"badges": {
"pending": "{{count}} pending",
@@ -432,7 +433,9 @@
"view_list": "View List",
"delete_dialog_title": "Delete Import Session",
"delete_dialog_description": "Are you sure you want to delete \"{{name}}\"? This action cannot be undone. The bookmarks themselves will not be deleted.",
- "delete_session": "Delete Session"
+ "delete_session": "Delete Session",
+ "pause_session": "Pause",
+ "resume_session": "Resume"
},
"backups": {
"backups": "Backups",
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index dfbac85b..931a505f 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -28,6 +28,7 @@ import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
+import { ImportWorker } from "./workers/importWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
import { RuleEngineWorker } from "./workers/ruleEngineWorker";
import { SearchIndexingWorker } from "./workers/searchWorker";
@@ -77,7 +78,7 @@ const workerBuilders = {
},
} as const;
-type WorkerName = keyof typeof workerBuilders;
+type WorkerName = keyof typeof workerBuilders | "import";
const enabledWorkers = new Set(serverConfig.workers.enabledWorkers);
const disabledWorkers = new Set(serverConfig.workers.disabledWorkers);
@@ -118,10 +119,19 @@ async function main() {
BackupSchedulingWorker.start();
}
+ // Start import polling worker
+ let importWorker: ImportWorker | null = null;
+ let importWorkerPromise: Promise<void> | null = null;
+ if (isWorkerEnabled("import")) {
+ importWorker = new ImportWorker();
+ importWorkerPromise = importWorker.start();
+ }
+
await Promise.any([
Promise.all([
...workers.map(({ worker }) => worker.run()),
httpServer.serve(),
+ ...(importWorkerPromise ? [importWorkerPromise] : []),
]),
shutdownPromise,
]);
@@ -136,6 +146,9 @@ async function main() {
if (workers.some((w) => w.name === "backup")) {
BackupSchedulingWorker.stop();
}
+ if (importWorker) {
+ importWorker.stop();
+ }
for (const { worker } of workers) {
worker.stop();
}
diff --git a/apps/workers/workers/importWorker.ts b/apps/workers/workers/importWorker.ts
new file mode 100644
index 00000000..a717fd9d
--- /dev/null
+++ b/apps/workers/workers/importWorker.ts
@@ -0,0 +1,565 @@
+import {
+ and,
+ count,
+ eq,
+ inArray,
+ isNotNull,
+ isNull,
+ lt,
+ or,
+} from "drizzle-orm";
+import { Counter, Gauge, Histogram } from "prom-client";
+import { buildImpersonatingTRPCClient } from "trpc";
+
+import { db } from "@karakeep/db";
+import {
+ bookmarkLinks,
+ bookmarks,
+ importSessions,
+ importStagingBookmarks,
+} from "@karakeep/db/schema";
+import logger from "@karakeep/shared/logger";
+import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+
+// Prometheus metrics
+const importStagingProcessedCounter = new Counter({
+ name: "import_staging_processed_total",
+ help: "Total number of staged items processed",
+ labelNames: ["result"],
+});
+
+const importStagingStaleResetCounter = new Counter({
+ name: "import_staging_stale_reset_total",
+ help: "Total number of stale processing items reset to pending",
+});
+
+const importStagingInFlightGauge = new Gauge({
+ name: "import_staging_in_flight",
+ help: "Current number of in-flight items (processing + recently completed)",
+});
+
+const importSessionsGauge = new Gauge({
+ name: "import_sessions_active",
+ help: "Number of active import sessions by status",
+ labelNames: ["status"],
+});
+
+const importStagingPendingGauge = new Gauge({
+ name: "import_staging_pending_total",
+ help: "Total number of pending items in staging table",
+});
+
+const importBatchDurationHistogram = new Histogram({
+ name: "import_batch_duration_seconds",
+ help: "Time taken to process a batch of staged items",
+ buckets: [0.1, 0.5, 1, 2, 5, 10, 30],
+});
+
+function sleep(ms: number): Promise<void> {
+ return new Promise((resolve) => setTimeout(resolve, ms));
+}
+
+export class ImportWorker {
+ private running = false;
+ private pollIntervalMs = 1000;
+
+ // Backpressure settings
+ private maxInFlight = 50;
+ private batchSize = 10;
+ private staleThresholdMs = 60 * 60 * 1000; // 1 hour
+
+ async start() {
+ this.running = true;
+ let iterationCount = 0;
+
+ logger.info("[import] Starting import polling worker");
+
+ while (this.running) {
+ try {
+ // Periodically reset stale processing items (every 60 iterations ~= 1 min)
+ if (iterationCount % 60 === 0) {
+ await this.resetStaleProcessingItems();
+ }
+ iterationCount++;
+
+ // Check if any processing items have completed downstream work
+ await this.checkAndCompleteProcessingItems();
+
+ const processed = await this.processBatch();
+ if (processed === 0) {
+ await this.checkAndCompleteIdleSessions();
+ // Nothing to do, wait before polling again
+ await sleep(this.pollIntervalMs);
+ }
+ } catch (error) {
+ logger.error(`[import] Error in polling loop: ${error}`);
+ await sleep(this.pollIntervalMs);
+ }
+ }
+ }
+
+ stop() {
+ logger.info("[import] Stopping import polling worker");
+ this.running = false;
+ }
+
+ private async processBatch(): Promise<number> {
+ // 1. Check backpressure - count in-flight + recently completed items
+ const availableCapacity = await this.getAvailableCapacity();
+ importStagingInFlightGauge.set(this.maxInFlight - availableCapacity);
+
+ if (availableCapacity <= 0) {
+ // At capacity, wait before trying again
+ return 0;
+ }
+
+ // 2. Get candidate IDs with fair scheduling across users
+ const batchLimit = Math.min(this.batchSize, availableCapacity);
+ const candidateIds = await this.getNextBatchFairly(batchLimit);
+
+ if (candidateIds.length === 0) return 0;
+
+ // 3. Atomically claim rows - only rows still pending will be claimed
+ // This prevents race conditions where multiple workers select the same rows
+ const batch = await db
+ .update(importStagingBookmarks)
+ .set({ status: "processing", processingStartedAt: new Date() })
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "pending"),
+ inArray(importStagingBookmarks.id, candidateIds),
+ ),
+ )
+ .returning();
+
+ // If no rows were claimed (another worker got them first), skip processing
+ if (batch.length === 0) return 0;
+
+ const batchTimer = importBatchDurationHistogram.startTimer();
+
+ // 4. Mark session(s) as running (using claimed rows, not candidates)
+ const sessionIds = [...new Set(batch.map((b) => b.importSessionId))];
+ await db
+ .update(importSessions)
+ .set({ status: "running" })
+ .where(
+ and(
+ inArray(importSessions.id, sessionIds),
+ eq(importSessions.status, "pending"),
+ ),
+ );
+
+ // 5. Process in parallel
+ await Promise.allSettled(
+ batch.map((staged) => this.processOneBookmark(staged)),
+ );
+
+ // 6. Check if any sessions are now complete
+ await this.checkAndCompleteEmptySessions(sessionIds);
+
+ batchTimer(); // Record batch duration
+ await this.updateGauges(); // Update pending/session gauges
+
+ return batch.length;
+ }
+
+ private async updateGauges() {
+ // Update pending items gauge
+ const pending = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(eq(importStagingBookmarks.status, "pending"));
+ importStagingPendingGauge.set(pending[0]?.count ?? 0);
+
+ // Update active sessions gauge by status
+ const sessions = await db
+ .select({
+ status: importSessions.status,
+ count: count(),
+ })
+ .from(importSessions)
+ .where(
+ inArray(importSessions.status, [
+ "staging",
+ "pending",
+ "running",
+ "paused",
+ ]),
+ )
+ .groupBy(importSessions.status);
+
+ // Reset all status gauges to 0 first
+ for (const status of ["staging", "pending", "running", "paused"]) {
+ importSessionsGauge.set({ status }, 0);
+ }
+
+ // Set actual values
+ for (const s of sessions) {
+ importSessionsGauge.set({ status: s.status }, s.count);
+ }
+ }
+
+ private async checkAndCompleteIdleSessions() {
+ const sessions = await db
+ .select({ id: importSessions.id })
+ .from(importSessions)
+ .where(inArray(importSessions.status, ["pending", "running"]));
+
+ const sessionIds = sessions.map((session) => session.id);
+ if (sessionIds.length === 0) {
+ return;
+ }
+
+ await this.checkAndCompleteEmptySessions(sessionIds);
+ }
+
+ private async getNextBatchFairly(limit: number): Promise<string[]> {
+ // Query pending item IDs from active sessions, ordered by:
+ // 1. User's last-served timestamp (fairness)
+ // 2. Staging item creation time (FIFO within user)
+ // Returns only IDs - actual rows will be fetched atomically during claim
+ const results = await db
+ .select({
+ id: importStagingBookmarks.id,
+ })
+ .from(importStagingBookmarks)
+ .innerJoin(
+ importSessions,
+ eq(importStagingBookmarks.importSessionId, importSessions.id),
+ )
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "pending"),
+ inArray(importSessions.status, ["pending", "running"]),
+ ),
+ )
+ .orderBy(importSessions.lastProcessedAt, importStagingBookmarks.createdAt)
+ .limit(limit);
+
+ return results.map((r) => r.id);
+ }
+
+ private async attachBookmarkToLists(
+ caller: Awaited<ReturnType<typeof buildImpersonatingTRPCClient>>,
+ session: typeof importSessions.$inferSelect,
+ staged: typeof importStagingBookmarks.$inferSelect,
+ bookmarkId: string,
+ ): Promise<void> {
+ const listIds = new Set<string>();
+
+ if (session.rootListId) {
+ listIds.add(session.rootListId);
+ }
+
+ if (staged.listIds && staged.listIds.length > 0) {
+ for (const listId of staged.listIds) {
+ listIds.add(listId);
+ }
+ }
+
+ for (const listId of listIds) {
+ try {
+ await caller.lists.addToList({ listId, bookmarkId });
+ } catch (error) {
+ logger.warn(
+ `[import] Failed to add bookmark ${bookmarkId} to list ${listId}: ${error}`,
+ );
+ }
+ }
+ }
+
+ private async processOneBookmark(
+ staged: typeof importStagingBookmarks.$inferSelect,
+ ) {
+ const session = await db.query.importSessions.findFirst({
+ where: eq(importSessions.id, staged.importSessionId),
+ });
+
+ if (!session || session.status === "paused") {
+ // Session paused mid-batch, reset item to pending
+ await db
+ .update(importStagingBookmarks)
+ .set({ status: "pending" })
+ .where(eq(importStagingBookmarks.id, staged.id));
+ return;
+ }
+
+ try {
+ // Use existing tRPC mutation via internal caller
+ // Note: Duplicate detection is handled by createBookmark itself
+ const caller = await buildImpersonatingTRPCClient(session.userId);
+
+ // Build the request based on bookmark type
+ type CreateBookmarkInput = Parameters<
+ typeof caller.bookmarks.createBookmark
+ >[0];
+
+ const baseRequest = {
+ title: staged.title ?? undefined,
+ note: staged.note ?? undefined,
+ createdAt: staged.sourceAddedAt ?? undefined,
+ crawlPriority: "low" as const,
+ };
+
+ let bookmarkRequest: CreateBookmarkInput;
+
+ if (staged.type === "link") {
+ if (!staged.url) {
+ throw new Error("URL is required for link bookmarks");
+ }
+ bookmarkRequest = {
+ ...baseRequest,
+ type: BookmarkTypes.LINK,
+ url: staged.url,
+ };
+ } else if (staged.type === "text") {
+ if (!staged.content) {
+ throw new Error("Content is required for text bookmarks");
+ }
+ bookmarkRequest = {
+ ...baseRequest,
+ type: BookmarkTypes.TEXT,
+ text: staged.content,
+ };
+ } else {
+ // asset type - skip for now as it needs special handling
+ logger.warn(
+ `[import] Asset bookmarks not yet supported in import worker: ${staged.id}`,
+ );
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "failed",
+ result: "rejected",
+ resultReason: "Asset bookmarks not yet supported",
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return;
+ }
+
+ const result = await caller.bookmarks.createBookmark(bookmarkRequest);
+
+ // Apply tags via existing mutation (for both new and duplicate bookmarks)
+ if (staged.tags && staged.tags.length > 0) {
+ await caller.bookmarks.updateTags({
+ bookmarkId: result.id,
+ attach: staged.tags.map((t) => ({ tagName: t })),
+ detach: [],
+ });
+ }
+
+ // Handle duplicate case (createBookmark returns alreadyExists: true)
+ if (result.alreadyExists) {
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "completed",
+ result: "skipped_duplicate",
+ resultReason: "URL already exists",
+ resultBookmarkId: result.id,
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+
+ importStagingProcessedCounter.inc({ result: "skipped_duplicate" });
+ await this.attachBookmarkToLists(caller, session, staged, result.id);
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ return;
+ }
+
+ // Mark as accepted but keep in "processing" until crawl/tag is done
+ // The item will be moved to "completed" by checkAndCompleteProcessingItems()
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ result: "accepted",
+ resultBookmarkId: result.id,
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+
+ await this.attachBookmarkToLists(caller, session, staged, result.id);
+
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ } catch (error) {
+ logger.error(
+ `[import] Error processing staged item ${staged.id}: ${error}`,
+ );
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "failed",
+ result: "rejected",
+ resultReason: String(error),
+ completedAt: new Date(),
+ })
+ .where(eq(importStagingBookmarks.id, staged.id));
+
+ importStagingProcessedCounter.inc({ result: "rejected" });
+ await this.updateSessionLastProcessedAt(staged.importSessionId);
+ }
+ }
+
+ private async updateSessionLastProcessedAt(sessionId: string) {
+ await db
+ .update(importSessions)
+ .set({ lastProcessedAt: new Date() })
+ .where(eq(importSessions.id, sessionId));
+ }
+
+ private async checkAndCompleteEmptySessions(sessionIds: string[]) {
+ for (const sessionId of sessionIds) {
+ const remaining = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.importSessionId, sessionId),
+ inArray(importStagingBookmarks.status, ["pending", "processing"]),
+ ),
+ );
+
+ if (remaining[0]?.count === 0) {
+ await db
+ .update(importSessions)
+ .set({ status: "completed" })
+ .where(eq(importSessions.id, sessionId));
+ }
+ }
+ }
+
+ /**
+ * Check processing items that have a bookmark created and mark them as completed
+ * once downstream processing (crawling/tagging) is done.
+ */
+ private async checkAndCompleteProcessingItems(): Promise<number> {
+ // Find processing items where:
+ // - A bookmark was created (resultBookmarkId is set)
+ // - Downstream processing is complete (crawl/tag not pending)
+ const completedItems = await db
+ .select({
+ id: importStagingBookmarks.id,
+ importSessionId: importStagingBookmarks.importSessionId,
+ })
+ .from(importStagingBookmarks)
+ .leftJoin(
+ bookmarks,
+ eq(bookmarks.id, importStagingBookmarks.resultBookmarkId),
+ )
+ .leftJoin(
+ bookmarkLinks,
+ eq(bookmarkLinks.id, importStagingBookmarks.resultBookmarkId),
+ )
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ isNotNull(importStagingBookmarks.resultBookmarkId),
+ // Crawl is done (not pending) - either success, failure, or null (not a link)
+ or(
+ isNull(bookmarkLinks.crawlStatus),
+ eq(bookmarkLinks.crawlStatus, "success"),
+ eq(bookmarkLinks.crawlStatus, "failure"),
+ ),
+ // Tagging is done (not pending) - either success, failure, or null
+ or(
+ isNull(bookmarks.taggingStatus),
+ eq(bookmarks.taggingStatus, "success"),
+ eq(bookmarks.taggingStatus, "failure"),
+ ),
+ ),
+ );
+
+ if (completedItems.length === 0) {
+ return 0;
+ }
+
+ // Mark them as completed
+ await db
+ .update(importStagingBookmarks)
+ .set({
+ status: "completed",
+ completedAt: new Date(),
+ })
+ .where(
+ inArray(
+ importStagingBookmarks.id,
+ completedItems.map((i) => i.id),
+ ),
+ );
+
+ // Increment counter for completed items
+ importStagingProcessedCounter.inc(
+ { result: "accepted" },
+ completedItems.length,
+ );
+
+ // Check if any sessions are now complete
+ const sessionIds = [
+ ...new Set(completedItems.map((i) => i.importSessionId)),
+ ];
+ await this.checkAndCompleteEmptySessions(sessionIds);
+
+ return completedItems.length;
+ }
+
+ /**
+ * Backpressure: Calculate available capacity.
+ * Counts items currently in "processing" status, which includes both:
+ * - Items being actively imported
+ * - Items waiting for downstream crawl/tag to complete
+ */
+ private async getAvailableCapacity(): Promise<number> {
+ const processing = await db
+ .select({ count: count() })
+ .from(importStagingBookmarks)
+ .where(eq(importStagingBookmarks.status, "processing"));
+
+ return this.maxInFlight - (processing[0]?.count ?? 0);
+ }
+
+ /**
+ * Reset stale "processing" items back to "pending" so they can be retried.
+ * Called periodically to handle crashed workers or stuck items.
+ *
+ * Only resets items that don't have a resultBookmarkId - those with a bookmark
+ * are waiting for downstream processing (crawl/tag), not stale.
+ */
+ private async resetStaleProcessingItems(): Promise<number> {
+ const staleThreshold = new Date(Date.now() - this.staleThresholdMs);
+
+ const staleItems = await db
+ .select({ id: importStagingBookmarks.id })
+ .from(importStagingBookmarks)
+ .where(
+ and(
+ eq(importStagingBookmarks.status, "processing"),
+ lt(importStagingBookmarks.processingStartedAt, staleThreshold),
+ // Only reset items that haven't created a bookmark yet
+ // Items with a bookmark are waiting for downstream, not stale
+ isNull(importStagingBookmarks.resultBookmarkId),
+ ),
+ );
+
+ if (staleItems.length > 0) {
+ logger.warn(
+ `[import] Resetting ${staleItems.length} stale processing items`,
+ );
+
+ await db
+ .update(importStagingBookmarks)
+ .set({ status: "pending", processingStartedAt: null })
+ .where(
+ inArray(
+ importStagingBookmarks.id,
+ staleItems.map((i) => i.id),
+ ),
+ );
+
+ importStagingStaleResetCounter.inc(staleItems.length);
+ return staleItems.length;
+ }
+
+ return 0;
+ }
+}