From 86a4b3966504507afd6c3adbb6a1246cafd39d83 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 29 Nov 2025 14:53:31 +0000 Subject: feat: Add automated bookmark backup feature (#2182) * feat: Add automated bookmark backup system Implements a comprehensive automated backup feature for user bookmarks with the following capabilities: Database Schema: - Add backupSettings table to store user backup preferences (enabled, frequency, retention) - Add backups table to track backup records with status and metadata - Add BACKUP asset type for storing compressed backup files - Add migration 0066_add_backup_tables.sql Background Workers: - Implement BackupSchedulingWorker cron job (runs daily at midnight UTC) - Create BackupWorker to process individual backup jobs - Deterministic scheduling spreads backup jobs across 24 hours based on user ID hash - Support for daily and weekly backup frequencies - Automated retention cleanup to delete old backups based on user settings Export & Compression: - Reuse existing export functionality for bookmark data - Compress exports using Node.js built-in zlib (gzip level 9) - Store compressed backups as assets with proper metadata - Track backup size and bookmark count for statistics tRPC API: - backups.getSettings - Retrieve user backup configuration - backups.updateSettings - Update backup preferences - backups.list - List all user backups with metadata - backups.get - Get specific backup details - backups.delete - Delete a backup - backups.download - Download backup file (base64 encoded) - backups.triggerBackup - Manually trigger backup creation UI Components: - BackupSettings component with configuration form - Enable/disable automatic backups toggle - Frequency selection (daily/weekly) - Retention period configuration (1-365 days) - Backup list table with download and delete actions - Manual backup trigger button - Display backup stats (size, bookmark count, status) - Added backups page to settings navigation Technical Details: - Uses Restate queue system for distributed job processing - Implements idempotency keys to prevent duplicate backups - Background worker concurrency: 2 jobs at a time - 10-minute timeout for large backup exports - Proper error handling and logging throughout - Type-safe implementation with Zod schemas * refactor: simplify backup settings and asset handling - Move backup settings from separate table to user table columns - Update BackupSettings model to use static methods with users table - Remove download mutation in favor of direct asset links - Implement proper quota checks using QuotaService.checkStorageQuota - Update UI to use new property names and direct asset downloads - Update shared types to match new schema Key changes: - backupSettingsTable removed, settings now in users table - Backup downloads use direct /api/assets/{id} links - Quota properly validated before creating backup assets - Cleaner separation of concerns in tRPC models * migration * use zip instead of gzip * fix drizzle * fix settings * streaming json * remove more dead code * add e2e tests * return backup * poll for backups * more fixes * more fixes * fix test * fix UI * fix delete asset * fix ui * redirect for backup download * cleanups * fix idempotency * fix tests * add ratelimit * add error handling for background backups * i18n * model changes --------- Co-authored-by: Claude --- apps/web/app/settings/backups/page.tsx | 17 + apps/web/app/settings/layout.tsx | 6 + apps/web/components/settings/BackupSettings.tsx | 423 +++++++++++++++++++++++ apps/web/lib/i18n/locales/en/translation.json | 49 +++ apps/web/lib/userSettings.tsx | 3 + apps/workers/index.ts | 9 + apps/workers/package.json | 2 + apps/workers/workers/backupWorker.ts | 431 ++++++++++++++++++++++++ apps/workers/workers/utils/fetchBookmarks.ts | 131 +++++++ 9 files changed, 1071 insertions(+) create mode 100644 apps/web/app/settings/backups/page.tsx create mode 100644 apps/web/components/settings/BackupSettings.tsx create mode 100644 apps/workers/workers/backupWorker.ts create mode 100644 apps/workers/workers/utils/fetchBookmarks.ts (limited to 'apps') diff --git a/apps/web/app/settings/backups/page.tsx b/apps/web/app/settings/backups/page.tsx new file mode 100644 index 00000000..fc263089 --- /dev/null +++ b/apps/web/app/settings/backups/page.tsx @@ -0,0 +1,17 @@ +"use client"; + +import BackupSettings from "@/components/settings/BackupSettings"; +import { useTranslation } from "@/lib/i18n/client"; + +export default function BackupsPage() { + const { t } = useTranslation(); + return ( +
+

{t("settings.backups.page_title")}

+

+ {t("settings.backups.page_description")} +

+ +
+ ); +} diff --git a/apps/web/app/settings/layout.tsx b/apps/web/app/settings/layout.tsx index 982ac61a..1c7d25ac 100644 --- a/apps/web/app/settings/layout.tsx +++ b/apps/web/app/settings/layout.tsx @@ -7,6 +7,7 @@ import { TFunction } from "i18next"; import { ArrowLeft, BarChart3, + CloudDownload, CreditCard, Download, GitBranch, @@ -67,6 +68,11 @@ const settingsSidebarItems = ( icon: , path: "/settings/feeds", }, + { + name: t("settings.backups.backups"), + icon: , + path: "/settings/backups", + }, { name: t("settings.import.import_export"), icon: , diff --git a/apps/web/components/settings/BackupSettings.tsx b/apps/web/components/settings/BackupSettings.tsx new file mode 100644 index 00000000..18a80993 --- /dev/null +++ b/apps/web/components/settings/BackupSettings.tsx @@ -0,0 +1,423 @@ +"use client"; + +import React from "react"; +import Link from "next/link"; +import { ActionButton } from "@/components/ui/action-button"; +import { + Form, + FormControl, + FormDescription, + FormField, + FormItem, + FormLabel, + FormMessage, +} from "@/components/ui/form"; +import { FullPageSpinner } from "@/components/ui/full-page-spinner"; +import { Input } from "@/components/ui/input"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Switch } from "@/components/ui/switch"; +import { toast } from "@/components/ui/use-toast"; +import { useTranslation } from "@/lib/i18n/client"; +import { api } from "@/lib/trpc"; +import { useUserSettings } from "@/lib/userSettings"; +import { zodResolver } from "@hookform/resolvers/zod"; +import { + CheckCircle, + Download, + Play, + Save, + Trash2, + XCircle, +} from "lucide-react"; +import { useForm } from "react-hook-form"; +import { z } from "zod"; + +import { useUpdateUserSettings } from "@karakeep/shared-react/hooks/users"; +import { zBackupSchema } from "@karakeep/shared/types/backups"; +import { zUpdateBackupSettingsSchema } from "@karakeep/shared/types/users"; +import { getAssetUrl } from "@karakeep/shared/utils/assetUtils"; + +import ActionConfirmingDialog from "../ui/action-confirming-dialog"; +import { Button } from "../ui/button"; +import { + Table, + TableBody, + TableCell, + TableHead, + TableHeader, + TableRow, +} from "../ui/table"; +import { Tooltip, TooltipContent, TooltipTrigger } from "../ui/tooltip"; + +function BackupConfigurationForm() { + const { t } = useTranslation(); + + const settings = useUserSettings(); + const { mutate: updateSettings, isPending: isUpdating } = + useUpdateUserSettings({ + onSuccess: () => { + toast({ + description: t("settings.info.user_settings.user_settings_updated"), + }); + }, + onError: () => { + toast({ + description: t("common.something_went_wrong"), + variant: "destructive", + }); + }, + }); + + const form = useForm>({ + resolver: zodResolver(zUpdateBackupSettingsSchema), + values: settings + ? { + backupsEnabled: settings.backupsEnabled, + backupsFrequency: settings.backupsFrequency, + backupsRetentionDays: settings.backupsRetentionDays, + } + : undefined, + }); + + return ( +
+

+ {t("settings.backups.configuration.title")} +

+
+ { + updateSettings(value); + })} + > + ( + +
+ + {t( + "settings.backups.configuration.enable_automatic_backups", + )} + + + {t( + "settings.backups.configuration.enable_automatic_backups_description", + )} + +
+ + + +
+ )} + /> + + ( + + + {t("settings.backups.configuration.backup_frequency")} + + + + + + {t( + "settings.backups.configuration.backup_frequency_description", + )} + + + + )} + /> + + ( + + + {t("settings.backups.configuration.retention_period")} + + + field.onChange(parseInt(e.target.value))} + /> + + + {t( + "settings.backups.configuration.retention_period_description", + )} + + + + )} + /> + + + + {t("settings.backups.configuration.save_settings")} + + + +
+ ); +} + +function BackupRow({ backup }: { backup: z.infer }) { + const { t } = useTranslation(); + const apiUtils = api.useUtils(); + + const { mutate: deleteBackup, isPending: isDeleting } = + api.backups.delete.useMutation({ + onSuccess: () => { + toast({ + description: t("settings.backups.toasts.backup_deleted"), + }); + apiUtils.backups.list.invalidate(); + }, + onError: (error) => { + toast({ + description: `Error: ${error.message}`, + variant: "destructive", + }); + }, + }); + + const formatSize = (bytes: number) => { + if (bytes < 1024) return `${bytes} B`; + if (bytes < 1024 * 1024) return `${(bytes / 1024).toFixed(2)} KB`; + return `${(bytes / (1024 * 1024)).toFixed(2)} MB`; + }; + + return ( + + {backup.createdAt.toLocaleString()} + + {backup.status === "pending" + ? "-" + : backup.bookmarkCount.toLocaleString()} + + + {backup.status === "pending" ? "-" : formatSize(backup.size)} + + + {backup.status === "success" ? ( + + + {t("settings.backups.list.status.success")} + + ) : backup.status === "failure" ? ( + + + + + {t("settings.backups.list.status.failed")} + + + {backup.errorMessage} + + ) : ( + +
+ {t("settings.backups.list.status.pending")} + + )} + + + {backup.assetId && ( + + + + + + {t("settings.backups.list.actions.download_backup")} + + + )} + ( + deleteBackup({ backupId: backup.id })} + className="items-center" + type="button" + > + + {t("settings.backups.list.actions.delete_backup")} + + )} + > + + + + + ); +} + +function BackupsList() { + const { t } = useTranslation(); + const apiUtils = api.useUtils(); + const { data: backups, isLoading } = api.backups.list.useQuery(undefined, { + refetchInterval: (query) => { + const data = query.state.data; + // Poll every 3 seconds if there's a pending backup, otherwise don't poll + return data?.backups.some((backup) => backup.status === "pending") + ? 3000 + : false; + }, + }); + + const { mutate: triggerBackup, isPending: isTriggering } = + api.backups.triggerBackup.useMutation({ + onSuccess: () => { + toast({ + description: t("settings.backups.toasts.backup_queued"), + }); + apiUtils.backups.list.invalidate(); + }, + onError: (error) => { + toast({ + description: `Error: ${error.message}`, + variant: "destructive", + }); + }, + }); + + return ( +
+
+
+ + {t("settings.backups.list.title")} + + triggerBackup()} + loading={isTriggering} + variant="default" + className="items-center" + > + + {t("settings.backups.list.create_backup_now")} + +
+ + {isLoading && } + + {backups && backups.backups.length === 0 && ( +

+ {t("settings.backups.list.no_backups")} +

+ )} + + {backups && backups.backups.length > 0 && ( + + + + + {t("settings.backups.list.table.created_at")} + + + {t("settings.backups.list.table.bookmarks")} + + {t("settings.backups.list.table.size")} + {t("settings.backups.list.table.status")} + + {t("settings.backups.list.table.actions")} + + + + + {backups.backups.map((backup) => ( + + ))} + +
+ )} +
+
+ ); +} + +export default function BackupSettings() { + return ( +
+ + +
+ ); +} diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json index bc69f710..43d45cb5 100644 --- a/apps/web/lib/i18n/locales/en/translation.json +++ b/apps/web/lib/i18n/locales/en/translation.json @@ -359,6 +359,55 @@ "delete_dialog_title": "Delete Import Session", "delete_dialog_description": "Are you sure you want to delete \"{{name}}\"? This action cannot be undone. The bookmarks themselves will not be deleted.", "delete_session": "Delete Session" + }, + "backups": { + "backups": "Backups", + "page_title": "Backups", + "page_description": "Automatically create and manage backups of your bookmarks. Backups are compressed and stored securely.", + "configuration": { + "title": "Backup Configuration", + "enable_automatic_backups": "Enable Automatic Backups", + "enable_automatic_backups_description": "Automatically create backups of your bookmarks", + "backup_frequency": "Backup Frequency", + "backup_frequency_description": "How often backups should be created", + "retention_period": "Retention Period (days)", + "retention_period_description": "How many days to keep backups before deleting them", + "frequency": { + "daily": "Daily", + "weekly": "Weekly" + }, + "select_frequency": "Select frequency", + "save_settings": "Save Settings" + }, + "list": { + "title": "Your Backups", + "create_backup_now": "Create Backup Now", + "no_backups": "You don't have any backups yet. Enable automatic backups or create one manually.", + "table": { + "created_at": "Created At", + "bookmarks": "Bookmarks", + "size": "Size", + "status": "Status", + "actions": "Actions" + }, + "status": { + "success": "Success", + "failed": "Failed", + "pending": "Pending" + }, + "actions": { + "download_backup": "Download Backup", + "delete_backup": "Delete Backup" + } + }, + "dialogs": { + "delete_backup_title": "Delete Backup?", + "delete_backup_description": "Are you sure you want to delete this backup? This action cannot be undone." + }, + "toasts": { + "backup_queued": "Backup job has been queued! It will be processed shortly.", + "backup_deleted": "Backup has been deleted!" + } } }, "admin": { diff --git a/apps/web/lib/userSettings.tsx b/apps/web/lib/userSettings.tsx index 2ab7db2b..c7a133b7 100644 --- a/apps/web/lib/userSettings.tsx +++ b/apps/web/lib/userSettings.tsx @@ -10,6 +10,9 @@ export const UserSettingsContext = createContext({ bookmarkClickAction: "open_original_link", archiveDisplayBehaviour: "show", timezone: "UTC", + backupsEnabled: false, + backupsFrequency: "daily", + backupsRetentionDays: 7, }); export function UserSettingsContextProvider({ diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 38f831d7..b605b50f 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -13,6 +13,7 @@ import logger from "@karakeep/shared/logger"; import { shutdownPromise } from "./exit"; import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker"; import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; +import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; @@ -31,6 +32,7 @@ const workerBuilders = { assetPreprocessing: () => AssetPreprocessingWorker.build(), webhook: () => WebhookWorker.build(), ruleEngine: () => RuleEngineWorker.build(), + backup: () => BackupWorker.build(), } as const; type WorkerName = keyof typeof workerBuilders; @@ -69,6 +71,10 @@ async function main() { FeedRefreshingWorker.start(); } + if (workers.some((w) => w.name === "backup")) { + BackupSchedulingWorker.start(); + } + await Promise.any([ Promise.all([ ...workers.map(({ worker }) => worker.run()), @@ -84,6 +90,9 @@ async function main() { if (workers.some((w) => w.name === "feed")) { FeedRefreshingWorker.stop(); } + if (workers.some((w) => w.name === "backup")) { + BackupSchedulingWorker.stop(); + } for (const { worker } of workers) { worker.stop(); } diff --git a/apps/workers/package.json b/apps/workers/package.json index fb10583b..1b5b2c95 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -15,6 +15,7 @@ "@karakeep/tsconfig": "workspace:^0.1.0", "@mozilla/readability": "^0.6.0", "@tsconfig/node22": "^22.0.0", + "archiver": "^7.0.1", "async-mutex": "^0.4.1", "dompurify": "^3.2.4", "dotenv": "^16.4.1", @@ -57,6 +58,7 @@ }, "devDependencies": { "@karakeep/prettier-config": "workspace:^0.1.0", + "@types/archiver": "^7.0.0", "@types/jsdom": "^21.1.6", "@types/node-cron": "^3.0.11", "tsdown": "^0.12.9" diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts new file mode 100644 index 00000000..c2d1ae5a --- /dev/null +++ b/apps/workers/workers/backupWorker.ts @@ -0,0 +1,431 @@ +import { createHash } from "node:crypto"; +import { createWriteStream } from "node:fs"; +import { stat, unlink } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { createId } from "@paralleldrive/cuid2"; +import archiver from "archiver"; +import { eq } from "drizzle-orm"; +import { workerStatsCounter } from "metrics"; +import cron from "node-cron"; + +import type { ZBackupRequest } from "@karakeep/shared-server"; +import { db } from "@karakeep/db"; +import { assets, AssetTypes, users } from "@karakeep/db/schema"; +import { BackupQueue, QuotaService } from "@karakeep/shared-server"; +import { saveAssetFromFile } from "@karakeep/shared/assetdb"; +import { toExportFormat } from "@karakeep/shared/import-export"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; +import { AuthedContext } from "@karakeep/trpc"; +import { Backup } from "@karakeep/trpc/models/backups"; + +import { buildImpersonatingAuthedContext } from "../trpc"; +import { fetchBookmarksInBatches } from "./utils/fetchBookmarks"; + +// Run daily at midnight UTC +export const BackupSchedulingWorker = cron.schedule( + "0 0 * * *", + async () => { + logger.info("[backup] Scheduling daily backup jobs ..."); + try { + const usersWithBackups = await db.query.users.findMany({ + columns: { + id: true, + backupsFrequency: true, + }, + where: eq(users.backupsEnabled, true), + }); + + logger.info( + `[backup] Found ${usersWithBackups.length} users with backups enabled`, + ); + + const now = new Date(); + const currentDay = now.toISOString().split("T")[0]; // YYYY-MM-DD + + for (const user of usersWithBackups) { + // Deterministically schedule backups throughout the day based on user ID + // This spreads the load across 24 hours + const hash = createHash("sha256").update(user.id).digest("hex"); + const hashNum = parseInt(hash.substring(0, 8), 16); + + // For daily: schedule within 24 hours + // For weekly: only schedule on the user's designated day of week + let shouldSchedule = false; + let delayMs = 0; + + if (user.backupsFrequency === "daily") { + shouldSchedule = true; + // Spread across 24 hours (86400000 ms) + delayMs = hashNum % 86400000; + } else if (user.backupsFrequency === "weekly") { + // Use hash to determine day of week (0-6) + const userDayOfWeek = hashNum % 7; + const currentDayOfWeek = now.getDay(); + + if (userDayOfWeek === currentDayOfWeek) { + shouldSchedule = true; + // Spread across 24 hours + delayMs = hashNum % 86400000; + } + } + + if (shouldSchedule) { + const idempotencyKey = `${user.id}-${currentDay}`; + + await BackupQueue.enqueue( + { + userId: user.id, + }, + { + delayMs, + idempotencyKey, + }, + ); + + logger.info( + `[backup] Scheduled backup for user ${user.id} with delay ${Math.round(delayMs / 1000 / 60)} minutes`, + ); + } + } + + logger.info("[backup] Finished scheduling backup jobs"); + } catch (error) { + logger.error(`[backup] Error scheduling backup jobs: ${error}`); + } + }, + { + runOnInit: false, + scheduled: false, + }, +); + +export class BackupWorker { + static async build() { + logger.info("Starting backup worker ..."); + const worker = (await getQueueClient())!.createRunner( + BackupQueue, + { + run: run, + onComplete: async (job) => { + workerStatsCounter.labels("backup", "completed").inc(); + const jobId = job.id; + logger.info(`[backup][${jobId}] Completed successfully`); + }, + onError: async (job) => { + workerStatsCounter.labels("backup", "failed").inc(); + if (job.numRetriesLeft == 0) { + workerStatsCounter.labels("backup", "failed_permanent").inc(); + } + const jobId = job.id; + logger.error( + `[backup][${jobId}] Backup job failed: ${job.error}\n${job.error?.stack}`, + ); + + // Mark backup as failed + if (job.data?.backupId && job.data?.userId) { + try { + const authCtx = await buildImpersonatingAuthedContext( + job.data.userId, + ); + const backup = await Backup.fromId(authCtx, job.data.backupId); + await backup.update({ + status: "failure", + errorMessage: job.error?.message || "Unknown error", + }); + } catch (err) { + logger.error( + `[backup][${jobId}] Failed to mark backup as failed: ${err}`, + ); + } + } + }, + }, + { + concurrency: 2, // Process 2 backups at a time + pollIntervalMs: 5000, + timeoutSecs: 600, // 10 minutes timeout for large exports + }, + ); + + return worker; + } +} + +async function run(req: DequeuedJob) { + const jobId = req.id; + const userId = req.data.userId; + const backupId = req.data.backupId; + + logger.info(`[backup][${jobId}] Starting backup for user ${userId} ...`); + + // Fetch user settings to check if backups are enabled and get retention + const user = await db.query.users.findFirst({ + columns: { + id: true, + backupsRetentionDays: true, + }, + where: eq(users.id, userId), + }); + + if (!user) { + logger.info(`[backup][${jobId}] User not found: ${userId}. Skipping.`); + return; + } + + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const tempJsonPath = join( + tmpdir(), + `karakeep-backup-${userId}-${timestamp}.json`, + ); + const tempZipPath = join( + tmpdir(), + `karakeep-backup-${userId}-${timestamp}.zip`, + ); + + let backup: Backup | null = null; + + try { + // Step 1: Stream bookmarks to JSON file + const ctx = await buildImpersonatingAuthedContext(userId); + const backupInstance = await (backupId + ? Backup.fromId(ctx, backupId) + : Backup.create(ctx)); + backup = backupInstance; + // Ensure backupId is attached to job data so error handler can mark failure. + req.data.backupId = backupInstance.id; + + const bookmarkCount = await streamBookmarksToJsonFile( + ctx, + tempJsonPath, + jobId, + ); + + logger.info( + `[backup][${jobId}] Streamed ${bookmarkCount} bookmarks to JSON file`, + ); + + // Step 2: Compress the JSON file as zip + logger.info(`[backup][${jobId}] Compressing JSON file as zip ...`); + await createZipArchiveFromFile(tempJsonPath, timestamp, tempZipPath); + + const fileStats = await stat(tempZipPath); + const compressedSize = fileStats.size; + const jsonStats = await stat(tempJsonPath); + + logger.info( + `[backup][${jobId}] Compressed ${jsonStats.size} bytes to ${compressedSize} bytes`, + ); + + // Step 3: Check quota and store as asset + const quotaApproval = await QuotaService.checkStorageQuota( + db, + userId, + compressedSize, + ); + const assetId = createId(); + const fileName = `karakeep-backup-${timestamp}.zip`; + + // Step 4: Create asset record + await db.insert(assets).values({ + id: assetId, + assetType: AssetTypes.BACKUP, + size: compressedSize, + contentType: "application/zip", + fileName: fileName, + bookmarkId: null, + userId: userId, + }); + await saveAssetFromFile({ + userId, + assetId, + assetPath: tempZipPath, + metadata: { + contentType: "application/zip", + fileName, + }, + quotaApproved: quotaApproval, + }); + + // Step 5: Update backup record + await backupInstance.update({ + size: compressedSize, + bookmarkCount: bookmarkCount, + status: "success", + assetId, + }); + + logger.info( + `[backup][${jobId}] Successfully created backup for user ${userId} with ${bookmarkCount} bookmarks (${compressedSize} bytes)`, + ); + + // Step 6: Clean up old backups based on retention + await cleanupOldBackups(ctx, user.backupsRetentionDays, jobId); + } catch (error) { + if (backup) { + try { + await backup.update({ + status: "failure", + errorMessage: + error instanceof Error ? error.message : "Unknown error", + }); + } catch (updateError) { + logger.error( + `[backup][${jobId}] Failed to mark backup ${backup.id} as failed: ${updateError}`, + ); + } + } + throw error; + } finally { + // Final cleanup of temporary files + try { + await unlink(tempJsonPath); + } catch { + // Ignore errors during cleanup + } + try { + await unlink(tempZipPath); + } catch { + // Ignore errors during cleanup + } + } +} + +/** + * Streams bookmarks to a JSON file in batches to avoid loading everything into memory + * @returns The total number of bookmarks written + */ +async function streamBookmarksToJsonFile( + ctx: AuthedContext, + outputPath: string, + jobId: string, +): Promise { + return new Promise((resolve, reject) => { + const writeStream = createWriteStream(outputPath, { encoding: "utf-8" }); + let bookmarkCount = 0; + let isFirst = true; + + writeStream.on("error", reject); + + // Start JSON structure + writeStream.write('{"bookmarks":['); + + (async () => { + try { + for await (const batch of fetchBookmarksInBatches(ctx, 1000)) { + for (const bookmark of batch) { + const exported = toExportFormat(bookmark); + if (exported.content !== null) { + // Add comma separator for all items except the first + if (!isFirst) { + writeStream.write(","); + } + writeStream.write(JSON.stringify(exported)); + isFirst = false; + bookmarkCount++; + } + } + + // Log progress every batch + if (bookmarkCount % 1000 === 0) { + logger.info( + `[backup][${jobId}] Streamed ${bookmarkCount} bookmarks so far...`, + ); + } + } + + // Close JSON structure + writeStream.write("]}"); + writeStream.end(); + + writeStream.on("finish", () => { + resolve(bookmarkCount); + }); + } catch (error) { + writeStream.destroy(); + reject(error); + } + })(); + }); +} + +/** + * Creates a zip archive from a JSON file (streaming from disk instead of memory) + */ +async function createZipArchiveFromFile( + jsonFilePath: string, + timestamp: string, + outputPath: string, +): Promise { + return new Promise((resolve, reject) => { + const archive = archiver("zip", { + zlib: { level: 9 }, // Maximum compression + }); + + const output = createWriteStream(outputPath); + + output.on("close", () => { + resolve(); + }); + + output.on("error", reject); + archive.on("error", reject); + + // Pipe archive data to the file + archive.pipe(output); + + // Add the JSON file to the zip (streaming from disk) + const jsonFileName = `karakeep-backup-${timestamp}.json`; + archive.file(jsonFilePath, { name: jsonFileName }); + + archive.finalize(); + }); +} + +/** + * Cleans up old backups based on retention policy + */ +async function cleanupOldBackups( + ctx: AuthedContext, + retentionDays: number, + jobId: string, +) { + try { + logger.info( + `[backup][${jobId}] Cleaning up backups older than ${retentionDays} days for user ${ctx.user.id} ...`, + ); + + const oldBackups = await Backup.findOldBackups(ctx, retentionDays); + + if (oldBackups.length === 0) { + return; + } + + logger.info( + `[backup][${jobId}] Found ${oldBackups.length} old backups to delete for user ${ctx.user.id}`, + ); + + // Delete each backup using the model's delete method + for (const backup of oldBackups) { + try { + await backup.delete(); + logger.info( + `[backup][${jobId}] Deleted backup ${backup.id} for user ${ctx.user.id}`, + ); + } catch (error) { + logger.warn( + `[backup][${jobId}] Failed to delete backup ${backup.id}: ${error}`, + ); + } + } + + logger.info( + `[backup][${jobId}] Successfully cleaned up ${oldBackups.length} old backups for user ${ctx.user.id}`, + ); + } catch (error) { + logger.error( + `[backup][${jobId}] Error cleaning up old backups for user ${ctx.user.id}: ${error}`, + ); + } +} diff --git a/apps/workers/workers/utils/fetchBookmarks.ts b/apps/workers/workers/utils/fetchBookmarks.ts new file mode 100644 index 00000000..0f357996 --- /dev/null +++ b/apps/workers/workers/utils/fetchBookmarks.ts @@ -0,0 +1,131 @@ +import { asc, eq } from "drizzle-orm"; + +import type { ZBookmark } from "@karakeep/shared/types/bookmarks"; +import type { ZCursor } from "@karakeep/shared/types/pagination"; +import type { AuthedContext } from "@karakeep/trpc"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; +import { Bookmark } from "@karakeep/trpc/models/bookmarks"; + +/** + * Fetches all bookmarks for a user with all necessary relations for export + * @deprecated Use fetchBookmarksInBatches for memory-efficient iteration + */ +export async function fetchAllBookmarksForUser( + dbInstance: typeof db, + userId: string, +): Promise { + const allBookmarks = await dbInstance.query.bookmarks.findMany({ + where: eq(bookmarks.userId, userId), + with: { + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + link: true, + text: true, + asset: true, + assets: true, + }, + orderBy: [asc(bookmarks.createdAt)], + }); + + // Transform to ZBookmark format + return allBookmarks.map((bookmark) => { + let content: ZBookmark["content"] | null = null; + + switch (bookmark.type) { + case BookmarkTypes.LINK: + if (bookmark.link) { + content = { + type: BookmarkTypes.LINK, + url: bookmark.link.url, + title: bookmark.link.title || undefined, + description: bookmark.link.description || undefined, + imageUrl: bookmark.link.imageUrl || undefined, + favicon: bookmark.link.favicon || undefined, + }; + } + break; + case BookmarkTypes.TEXT: + if (bookmark.text) { + content = { + type: BookmarkTypes.TEXT, + text: bookmark.text.text || "", + }; + } + break; + case BookmarkTypes.ASSET: + if (bookmark.asset) { + content = { + type: BookmarkTypes.ASSET, + assetType: bookmark.asset.assetType, + assetId: bookmark.asset.assetId, + }; + } + break; + } + + return { + id: bookmark.id, + title: bookmark.title || null, + createdAt: bookmark.createdAt, + archived: bookmark.archived, + favourited: bookmark.favourited, + taggingStatus: bookmark.taggingStatus || "pending", + note: bookmark.note || null, + summary: bookmark.summary || null, + content, + tags: bookmark.tagsOnBookmarks.map((t) => ({ + id: t.tag.id, + name: t.tag.name, + attachedBy: t.attachedBy, + })), + assets: bookmark.assets.map((a) => ({ + id: a.id, + assetType: a.assetType, + })), + } as ZBookmark; + }); +} + +/** + * Fetches bookmarks in batches using cursor-based pagination from the Bookmark model + * This is memory-efficient for large datasets as it only loads one batch at a time + */ +export async function* fetchBookmarksInBatches( + ctx: AuthedContext, + batchSize = 1000, +): AsyncGenerator { + let cursor: ZCursor | null = null; + let totalFetched = 0; + + while (true) { + const result = await Bookmark.loadMulti(ctx, { + limit: batchSize, + cursor: cursor, + sortOrder: "asc", + includeContent: false, // We don't need full content for export + }); + + if (result.bookmarks.length === 0) { + break; + } + + // Convert Bookmark instances to ZBookmark + const batch = result.bookmarks.map((b) => b.asZBookmark()); + yield batch; + + totalFetched += batch.length; + cursor = result.nextCursor; + + // If there's no next cursor, we've reached the end + if (!cursor) { + break; + } + } + + return totalFetched; +} -- cgit v1.2.3-70-g09d2