diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-11-29 14:53:31 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-11-29 14:53:31 +0000 |
| commit | 86a4b3966504507afd6c3adbb6a1246cafd39d83 (patch) | |
| tree | 66208555ef2720799d7196d777172b390eaf6d8f /apps/workers | |
| parent | e67c33e46626258b748eb492d124f263fb427d0d (diff) | |
| download | karakeep-86a4b3966504507afd6c3adbb6a1246cafd39d83.tar.zst | |
feat: Add automated bookmark backup feature (#2182)
* feat: Add automated bookmark backup system
Implements a comprehensive automated backup feature for user bookmarks with the following capabilities:
Database Schema:
- Add backupSettings table to store user backup preferences (enabled, frequency, retention)
- Add backups table to track backup records with status and metadata
- Add BACKUP asset type for storing compressed backup files
- Add migration 0066_add_backup_tables.sql
Background Workers:
- Implement BackupSchedulingWorker cron job (runs daily at midnight UTC)
- Create BackupWorker to process individual backup jobs
- Deterministic scheduling spreads backup jobs across 24 hours based on user ID hash
- Support for daily and weekly backup frequencies
- Automated retention cleanup to delete old backups based on user settings
Export & Compression:
- Reuse existing export functionality for bookmark data
- Compress exports using Node.js built-in zlib (gzip level 9)
- Store compressed backups as assets with proper metadata
- Track backup size and bookmark count for statistics
tRPC API:
- backups.getSettings - Retrieve user backup configuration
- backups.updateSettings - Update backup preferences
- backups.list - List all user backups with metadata
- backups.get - Get specific backup details
- backups.delete - Delete a backup
- backups.download - Download backup file (base64 encoded)
- backups.triggerBackup - Manually trigger backup creation
UI Components:
- BackupSettings component with configuration form
- Enable/disable automatic backups toggle
- Frequency selection (daily/weekly)
- Retention period configuration (1-365 days)
- Backup list table with download and delete actions
- Manual backup trigger button
- Display backup stats (size, bookmark count, status)
- Added backups page to settings navigation
Technical Details:
- Uses Restate queue system for distributed job processing
- Implements idempotency keys to prevent duplicate backups
- Background worker concurrency: 2 jobs at a time
- 10-minute timeout for large backup exports
- Proper error handling and logging throughout
- Type-safe implementation with Zod schemas
* refactor: simplify backup settings and asset handling
- Move backup settings from separate table to user table columns
- Update BackupSettings model to use static methods with users table
- Remove download mutation in favor of direct asset links
- Implement proper quota checks using QuotaService.checkStorageQuota
- Update UI to use new property names and direct asset downloads
- Update shared types to match new schema
Key changes:
- backupSettingsTable removed, settings now in users table
- Backup downloads use direct /api/assets/{id} links
- Quota properly validated before creating backup assets
- Cleaner separation of concerns in tRPC models
* migration
* use zip instead of gzip
* fix drizzle
* fix settings
* streaming json
* remove more dead code
* add e2e tests
* return backup
* poll for backups
* more fixes
* more fixes
* fix test
* fix UI
* fix delete asset
* fix ui
* redirect for backup download
* cleanups
* fix idempotency
* fix tests
* add ratelimit
* add error handling for background backups
* i18n
* model changes
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/index.ts | 9 | ||||
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | apps/workers/workers/backupWorker.ts | 431 | ||||
| -rw-r--r-- | apps/workers/workers/utils/fetchBookmarks.ts | 131 |
4 files changed, 573 insertions, 0 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 38f831d7..b605b50f 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -13,6 +13,7 @@ import logger from "@karakeep/shared/logger"; import { shutdownPromise } from "./exit"; import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker"; import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; +import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker"; import { CrawlerWorker } from "./workers/crawlerWorker"; import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; import { OpenAiWorker } from "./workers/inference/inferenceWorker"; @@ -31,6 +32,7 @@ const workerBuilders = { assetPreprocessing: () => AssetPreprocessingWorker.build(), webhook: () => WebhookWorker.build(), ruleEngine: () => RuleEngineWorker.build(), + backup: () => BackupWorker.build(), } as const; type WorkerName = keyof typeof workerBuilders; @@ -69,6 +71,10 @@ async function main() { FeedRefreshingWorker.start(); } + if (workers.some((w) => w.name === "backup")) { + BackupSchedulingWorker.start(); + } + await Promise.any([ Promise.all([ ...workers.map(({ worker }) => worker.run()), @@ -84,6 +90,9 @@ async function main() { if (workers.some((w) => w.name === "feed")) { FeedRefreshingWorker.stop(); } + if (workers.some((w) => w.name === "backup")) { + BackupSchedulingWorker.stop(); + } for (const { worker } of workers) { worker.stop(); } diff --git a/apps/workers/package.json b/apps/workers/package.json index fb10583b..1b5b2c95 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -15,6 +15,7 @@ "@karakeep/tsconfig": "workspace:^0.1.0", "@mozilla/readability": "^0.6.0", "@tsconfig/node22": "^22.0.0", + "archiver": "^7.0.1", "async-mutex": "^0.4.1", "dompurify": "^3.2.4", "dotenv": "^16.4.1", @@ -57,6 +58,7 @@ }, "devDependencies": { "@karakeep/prettier-config": "workspace:^0.1.0", + "@types/archiver": "^7.0.0", "@types/jsdom": "^21.1.6", "@types/node-cron": "^3.0.11", "tsdown": "^0.12.9" diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts new file mode 100644 index 00000000..c2d1ae5a --- /dev/null +++ b/apps/workers/workers/backupWorker.ts @@ -0,0 +1,431 @@ +import { createHash } from "node:crypto"; +import { createWriteStream } from "node:fs"; +import { stat, unlink } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import { createId } from "@paralleldrive/cuid2"; +import archiver from "archiver"; +import { eq } from "drizzle-orm"; +import { workerStatsCounter } from "metrics"; +import cron from "node-cron"; + +import type { ZBackupRequest } from "@karakeep/shared-server"; +import { db } from "@karakeep/db"; +import { assets, AssetTypes, users } from "@karakeep/db/schema"; +import { BackupQueue, QuotaService } from "@karakeep/shared-server"; +import { saveAssetFromFile } from "@karakeep/shared/assetdb"; +import { toExportFormat } from "@karakeep/shared/import-export"; +import logger from "@karakeep/shared/logger"; +import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; +import { AuthedContext } from "@karakeep/trpc"; +import { Backup } from "@karakeep/trpc/models/backups"; + +import { buildImpersonatingAuthedContext } from "../trpc"; +import { fetchBookmarksInBatches } from "./utils/fetchBookmarks"; + +// Run daily at midnight UTC +export const BackupSchedulingWorker = cron.schedule( + "0 0 * * *", + async () => { + logger.info("[backup] Scheduling daily backup jobs ..."); + try { + const usersWithBackups = await db.query.users.findMany({ + columns: { + id: true, + backupsFrequency: true, + }, + where: eq(users.backupsEnabled, true), + }); + + logger.info( + `[backup] Found ${usersWithBackups.length} users with backups enabled`, + ); + + const now = new Date(); + const currentDay = now.toISOString().split("T")[0]; // YYYY-MM-DD + + for (const user of usersWithBackups) { + // Deterministically schedule backups throughout the day based on user ID + // This spreads the load across 24 hours + const hash = createHash("sha256").update(user.id).digest("hex"); + const hashNum = parseInt(hash.substring(0, 8), 16); + + // For daily: schedule within 24 hours + // For weekly: only schedule on the user's designated day of week + let shouldSchedule = false; + let delayMs = 0; + + if (user.backupsFrequency === "daily") { + shouldSchedule = true; + // Spread across 24 hours (86400000 ms) + delayMs = hashNum % 86400000; + } else if (user.backupsFrequency === "weekly") { + // Use hash to determine day of week (0-6) + const userDayOfWeek = hashNum % 7; + const currentDayOfWeek = now.getDay(); + + if (userDayOfWeek === currentDayOfWeek) { + shouldSchedule = true; + // Spread across 24 hours + delayMs = hashNum % 86400000; + } + } + + if (shouldSchedule) { + const idempotencyKey = `${user.id}-${currentDay}`; + + await BackupQueue.enqueue( + { + userId: user.id, + }, + { + delayMs, + idempotencyKey, + }, + ); + + logger.info( + `[backup] Scheduled backup for user ${user.id} with delay ${Math.round(delayMs / 1000 / 60)} minutes`, + ); + } + } + + logger.info("[backup] Finished scheduling backup jobs"); + } catch (error) { + logger.error(`[backup] Error scheduling backup jobs: ${error}`); + } + }, + { + runOnInit: false, + scheduled: false, + }, +); + +export class BackupWorker { + static async build() { + logger.info("Starting backup worker ..."); + const worker = (await getQueueClient())!.createRunner<ZBackupRequest>( + BackupQueue, + { + run: run, + onComplete: async (job) => { + workerStatsCounter.labels("backup", "completed").inc(); + const jobId = job.id; + logger.info(`[backup][${jobId}] Completed successfully`); + }, + onError: async (job) => { + workerStatsCounter.labels("backup", "failed").inc(); + if (job.numRetriesLeft == 0) { + workerStatsCounter.labels("backup", "failed_permanent").inc(); + } + const jobId = job.id; + logger.error( + `[backup][${jobId}] Backup job failed: ${job.error}\n${job.error?.stack}`, + ); + + // Mark backup as failed + if (job.data?.backupId && job.data?.userId) { + try { + const authCtx = await buildImpersonatingAuthedContext( + job.data.userId, + ); + const backup = await Backup.fromId(authCtx, job.data.backupId); + await backup.update({ + status: "failure", + errorMessage: job.error?.message || "Unknown error", + }); + } catch (err) { + logger.error( + `[backup][${jobId}] Failed to mark backup as failed: ${err}`, + ); + } + } + }, + }, + { + concurrency: 2, // Process 2 backups at a time + pollIntervalMs: 5000, + timeoutSecs: 600, // 10 minutes timeout for large exports + }, + ); + + return worker; + } +} + +async function run(req: DequeuedJob<ZBackupRequest>) { + const jobId = req.id; + const userId = req.data.userId; + const backupId = req.data.backupId; + + logger.info(`[backup][${jobId}] Starting backup for user ${userId} ...`); + + // Fetch user settings to check if backups are enabled and get retention + const user = await db.query.users.findFirst({ + columns: { + id: true, + backupsRetentionDays: true, + }, + where: eq(users.id, userId), + }); + + if (!user) { + logger.info(`[backup][${jobId}] User not found: ${userId}. Skipping.`); + return; + } + + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const tempJsonPath = join( + tmpdir(), + `karakeep-backup-${userId}-${timestamp}.json`, + ); + const tempZipPath = join( + tmpdir(), + `karakeep-backup-${userId}-${timestamp}.zip`, + ); + + let backup: Backup | null = null; + + try { + // Step 1: Stream bookmarks to JSON file + const ctx = await buildImpersonatingAuthedContext(userId); + const backupInstance = await (backupId + ? Backup.fromId(ctx, backupId) + : Backup.create(ctx)); + backup = backupInstance; + // Ensure backupId is attached to job data so error handler can mark failure. + req.data.backupId = backupInstance.id; + + const bookmarkCount = await streamBookmarksToJsonFile( + ctx, + tempJsonPath, + jobId, + ); + + logger.info( + `[backup][${jobId}] Streamed ${bookmarkCount} bookmarks to JSON file`, + ); + + // Step 2: Compress the JSON file as zip + logger.info(`[backup][${jobId}] Compressing JSON file as zip ...`); + await createZipArchiveFromFile(tempJsonPath, timestamp, tempZipPath); + + const fileStats = await stat(tempZipPath); + const compressedSize = fileStats.size; + const jsonStats = await stat(tempJsonPath); + + logger.info( + `[backup][${jobId}] Compressed ${jsonStats.size} bytes to ${compressedSize} bytes`, + ); + + // Step 3: Check quota and store as asset + const quotaApproval = await QuotaService.checkStorageQuota( + db, + userId, + compressedSize, + ); + const assetId = createId(); + const fileName = `karakeep-backup-${timestamp}.zip`; + + // Step 4: Create asset record + await db.insert(assets).values({ + id: assetId, + assetType: AssetTypes.BACKUP, + size: compressedSize, + contentType: "application/zip", + fileName: fileName, + bookmarkId: null, + userId: userId, + }); + await saveAssetFromFile({ + userId, + assetId, + assetPath: tempZipPath, + metadata: { + contentType: "application/zip", + fileName, + }, + quotaApproved: quotaApproval, + }); + + // Step 5: Update backup record + await backupInstance.update({ + size: compressedSize, + bookmarkCount: bookmarkCount, + status: "success", + assetId, + }); + + logger.info( + `[backup][${jobId}] Successfully created backup for user ${userId} with ${bookmarkCount} bookmarks (${compressedSize} bytes)`, + ); + + // Step 6: Clean up old backups based on retention + await cleanupOldBackups(ctx, user.backupsRetentionDays, jobId); + } catch (error) { + if (backup) { + try { + await backup.update({ + status: "failure", + errorMessage: + error instanceof Error ? error.message : "Unknown error", + }); + } catch (updateError) { + logger.error( + `[backup][${jobId}] Failed to mark backup ${backup.id} as failed: ${updateError}`, + ); + } + } + throw error; + } finally { + // Final cleanup of temporary files + try { + await unlink(tempJsonPath); + } catch { + // Ignore errors during cleanup + } + try { + await unlink(tempZipPath); + } catch { + // Ignore errors during cleanup + } + } +} + +/** + * Streams bookmarks to a JSON file in batches to avoid loading everything into memory + * @returns The total number of bookmarks written + */ +async function streamBookmarksToJsonFile( + ctx: AuthedContext, + outputPath: string, + jobId: string, +): Promise<number> { + return new Promise((resolve, reject) => { + const writeStream = createWriteStream(outputPath, { encoding: "utf-8" }); + let bookmarkCount = 0; + let isFirst = true; + + writeStream.on("error", reject); + + // Start JSON structure + writeStream.write('{"bookmarks":['); + + (async () => { + try { + for await (const batch of fetchBookmarksInBatches(ctx, 1000)) { + for (const bookmark of batch) { + const exported = toExportFormat(bookmark); + if (exported.content !== null) { + // Add comma separator for all items except the first + if (!isFirst) { + writeStream.write(","); + } + writeStream.write(JSON.stringify(exported)); + isFirst = false; + bookmarkCount++; + } + } + + // Log progress every batch + if (bookmarkCount % 1000 === 0) { + logger.info( + `[backup][${jobId}] Streamed ${bookmarkCount} bookmarks so far...`, + ); + } + } + + // Close JSON structure + writeStream.write("]}"); + writeStream.end(); + + writeStream.on("finish", () => { + resolve(bookmarkCount); + }); + } catch (error) { + writeStream.destroy(); + reject(error); + } + })(); + }); +} + +/** + * Creates a zip archive from a JSON file (streaming from disk instead of memory) + */ +async function createZipArchiveFromFile( + jsonFilePath: string, + timestamp: string, + outputPath: string, +): Promise<void> { + return new Promise((resolve, reject) => { + const archive = archiver("zip", { + zlib: { level: 9 }, // Maximum compression + }); + + const output = createWriteStream(outputPath); + + output.on("close", () => { + resolve(); + }); + + output.on("error", reject); + archive.on("error", reject); + + // Pipe archive data to the file + archive.pipe(output); + + // Add the JSON file to the zip (streaming from disk) + const jsonFileName = `karakeep-backup-${timestamp}.json`; + archive.file(jsonFilePath, { name: jsonFileName }); + + archive.finalize(); + }); +} + +/** + * Cleans up old backups based on retention policy + */ +async function cleanupOldBackups( + ctx: AuthedContext, + retentionDays: number, + jobId: string, +) { + try { + logger.info( + `[backup][${jobId}] Cleaning up backups older than ${retentionDays} days for user ${ctx.user.id} ...`, + ); + + const oldBackups = await Backup.findOldBackups(ctx, retentionDays); + + if (oldBackups.length === 0) { + return; + } + + logger.info( + `[backup][${jobId}] Found ${oldBackups.length} old backups to delete for user ${ctx.user.id}`, + ); + + // Delete each backup using the model's delete method + for (const backup of oldBackups) { + try { + await backup.delete(); + logger.info( + `[backup][${jobId}] Deleted backup ${backup.id} for user ${ctx.user.id}`, + ); + } catch (error) { + logger.warn( + `[backup][${jobId}] Failed to delete backup ${backup.id}: ${error}`, + ); + } + } + + logger.info( + `[backup][${jobId}] Successfully cleaned up ${oldBackups.length} old backups for user ${ctx.user.id}`, + ); + } catch (error) { + logger.error( + `[backup][${jobId}] Error cleaning up old backups for user ${ctx.user.id}: ${error}`, + ); + } +} diff --git a/apps/workers/workers/utils/fetchBookmarks.ts b/apps/workers/workers/utils/fetchBookmarks.ts new file mode 100644 index 00000000..0f357996 --- /dev/null +++ b/apps/workers/workers/utils/fetchBookmarks.ts @@ -0,0 +1,131 @@ +import { asc, eq } from "drizzle-orm"; + +import type { ZBookmark } from "@karakeep/shared/types/bookmarks"; +import type { ZCursor } from "@karakeep/shared/types/pagination"; +import type { AuthedContext } from "@karakeep/trpc"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; +import { Bookmark } from "@karakeep/trpc/models/bookmarks"; + +/** + * Fetches all bookmarks for a user with all necessary relations for export + * @deprecated Use fetchBookmarksInBatches for memory-efficient iteration + */ +export async function fetchAllBookmarksForUser( + dbInstance: typeof db, + userId: string, +): Promise<ZBookmark[]> { + const allBookmarks = await dbInstance.query.bookmarks.findMany({ + where: eq(bookmarks.userId, userId), + with: { + tagsOnBookmarks: { + with: { + tag: true, + }, + }, + link: true, + text: true, + asset: true, + assets: true, + }, + orderBy: [asc(bookmarks.createdAt)], + }); + + // Transform to ZBookmark format + return allBookmarks.map((bookmark) => { + let content: ZBookmark["content"] | null = null; + + switch (bookmark.type) { + case BookmarkTypes.LINK: + if (bookmark.link) { + content = { + type: BookmarkTypes.LINK, + url: bookmark.link.url, + title: bookmark.link.title || undefined, + description: bookmark.link.description || undefined, + imageUrl: bookmark.link.imageUrl || undefined, + favicon: bookmark.link.favicon || undefined, + }; + } + break; + case BookmarkTypes.TEXT: + if (bookmark.text) { + content = { + type: BookmarkTypes.TEXT, + text: bookmark.text.text || "", + }; + } + break; + case BookmarkTypes.ASSET: + if (bookmark.asset) { + content = { + type: BookmarkTypes.ASSET, + assetType: bookmark.asset.assetType, + assetId: bookmark.asset.assetId, + }; + } + break; + } + + return { + id: bookmark.id, + title: bookmark.title || null, + createdAt: bookmark.createdAt, + archived: bookmark.archived, + favourited: bookmark.favourited, + taggingStatus: bookmark.taggingStatus || "pending", + note: bookmark.note || null, + summary: bookmark.summary || null, + content, + tags: bookmark.tagsOnBookmarks.map((t) => ({ + id: t.tag.id, + name: t.tag.name, + attachedBy: t.attachedBy, + })), + assets: bookmark.assets.map((a) => ({ + id: a.id, + assetType: a.assetType, + })), + } as ZBookmark; + }); +} + +/** + * Fetches bookmarks in batches using cursor-based pagination from the Bookmark model + * This is memory-efficient for large datasets as it only loads one batch at a time + */ +export async function* fetchBookmarksInBatches( + ctx: AuthedContext, + batchSize = 1000, +): AsyncGenerator<ZBookmark[], number, undefined> { + let cursor: ZCursor | null = null; + let totalFetched = 0; + + while (true) { + const result = await Bookmark.loadMulti(ctx, { + limit: batchSize, + cursor: cursor, + sortOrder: "asc", + includeContent: false, // We don't need full content for export + }); + + if (result.bookmarks.length === 0) { + break; + } + + // Convert Bookmark instances to ZBookmark + const batch = result.bookmarks.map((b) => b.asZBookmark()); + yield batch; + + totalFetched += batch.length; + cursor = result.nextCursor; + + // If there's no next cursor, we've reached the end + if (!cursor) { + break; + } + } + + return totalFetched; +} |
