aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-11-29 14:53:31 +0000
committerGitHub <noreply@github.com>2025-11-29 14:53:31 +0000
commit86a4b3966504507afd6c3adbb6a1246cafd39d83 (patch)
tree66208555ef2720799d7196d777172b390eaf6d8f /apps/workers
parente67c33e46626258b748eb492d124f263fb427d0d (diff)
downloadkarakeep-86a4b3966504507afd6c3adbb6a1246cafd39d83.tar.zst
feat: Add automated bookmark backup feature (#2182)
* feat: Add automated bookmark backup system Implements a comprehensive automated backup feature for user bookmarks with the following capabilities: Database Schema: - Add backupSettings table to store user backup preferences (enabled, frequency, retention) - Add backups table to track backup records with status and metadata - Add BACKUP asset type for storing compressed backup files - Add migration 0066_add_backup_tables.sql Background Workers: - Implement BackupSchedulingWorker cron job (runs daily at midnight UTC) - Create BackupWorker to process individual backup jobs - Deterministic scheduling spreads backup jobs across 24 hours based on user ID hash - Support for daily and weekly backup frequencies - Automated retention cleanup to delete old backups based on user settings Export & Compression: - Reuse existing export functionality for bookmark data - Compress exports using Node.js built-in zlib (gzip level 9) - Store compressed backups as assets with proper metadata - Track backup size and bookmark count for statistics tRPC API: - backups.getSettings - Retrieve user backup configuration - backups.updateSettings - Update backup preferences - backups.list - List all user backups with metadata - backups.get - Get specific backup details - backups.delete - Delete a backup - backups.download - Download backup file (base64 encoded) - backups.triggerBackup - Manually trigger backup creation UI Components: - BackupSettings component with configuration form - Enable/disable automatic backups toggle - Frequency selection (daily/weekly) - Retention period configuration (1-365 days) - Backup list table with download and delete actions - Manual backup trigger button - Display backup stats (size, bookmark count, status) - Added backups page to settings navigation Technical Details: - Uses Restate queue system for distributed job processing - Implements idempotency keys to prevent duplicate backups - Background worker concurrency: 2 jobs at a time - 10-minute timeout for large backup exports - Proper error handling and logging throughout - Type-safe implementation with Zod schemas * refactor: simplify backup settings and asset handling - Move backup settings from separate table to user table columns - Update BackupSettings model to use static methods with users table - Remove download mutation in favor of direct asset links - Implement proper quota checks using QuotaService.checkStorageQuota - Update UI to use new property names and direct asset downloads - Update shared types to match new schema Key changes: - backupSettingsTable removed, settings now in users table - Backup downloads use direct /api/assets/{id} links - Quota properly validated before creating backup assets - Cleaner separation of concerns in tRPC models * migration * use zip instead of gzip * fix drizzle * fix settings * streaming json * remove more dead code * add e2e tests * return backup * poll for backups * more fixes * more fixes * fix test * fix UI * fix delete asset * fix ui * redirect for backup download * cleanups * fix idempotency * fix tests * add ratelimit * add error handling for background backups * i18n * model changes --------- Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts9
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/workers/backupWorker.ts431
-rw-r--r--apps/workers/workers/utils/fetchBookmarks.ts131
4 files changed, 573 insertions, 0 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 38f831d7..b605b50f 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -13,6 +13,7 @@ import logger from "@karakeep/shared/logger";
import { shutdownPromise } from "./exit";
import { AdminMaintenanceWorker } from "./workers/adminMaintenanceWorker";
import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
+import { BackupSchedulingWorker, BackupWorker } from "./workers/backupWorker";
import { CrawlerWorker } from "./workers/crawlerWorker";
import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
import { OpenAiWorker } from "./workers/inference/inferenceWorker";
@@ -31,6 +32,7 @@ const workerBuilders = {
assetPreprocessing: () => AssetPreprocessingWorker.build(),
webhook: () => WebhookWorker.build(),
ruleEngine: () => RuleEngineWorker.build(),
+ backup: () => BackupWorker.build(),
} as const;
type WorkerName = keyof typeof workerBuilders;
@@ -69,6 +71,10 @@ async function main() {
FeedRefreshingWorker.start();
}
+ if (workers.some((w) => w.name === "backup")) {
+ BackupSchedulingWorker.start();
+ }
+
await Promise.any([
Promise.all([
...workers.map(({ worker }) => worker.run()),
@@ -84,6 +90,9 @@ async function main() {
if (workers.some((w) => w.name === "feed")) {
FeedRefreshingWorker.stop();
}
+ if (workers.some((w) => w.name === "backup")) {
+ BackupSchedulingWorker.stop();
+ }
for (const { worker } of workers) {
worker.stop();
}
diff --git a/apps/workers/package.json b/apps/workers/package.json
index fb10583b..1b5b2c95 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -15,6 +15,7 @@
"@karakeep/tsconfig": "workspace:^0.1.0",
"@mozilla/readability": "^0.6.0",
"@tsconfig/node22": "^22.0.0",
+ "archiver": "^7.0.1",
"async-mutex": "^0.4.1",
"dompurify": "^3.2.4",
"dotenv": "^16.4.1",
@@ -57,6 +58,7 @@
},
"devDependencies": {
"@karakeep/prettier-config": "workspace:^0.1.0",
+ "@types/archiver": "^7.0.0",
"@types/jsdom": "^21.1.6",
"@types/node-cron": "^3.0.11",
"tsdown": "^0.12.9"
diff --git a/apps/workers/workers/backupWorker.ts b/apps/workers/workers/backupWorker.ts
new file mode 100644
index 00000000..c2d1ae5a
--- /dev/null
+++ b/apps/workers/workers/backupWorker.ts
@@ -0,0 +1,431 @@
+import { createHash } from "node:crypto";
+import { createWriteStream } from "node:fs";
+import { stat, unlink } from "node:fs/promises";
+import { tmpdir } from "node:os";
+import { join } from "node:path";
+import { createId } from "@paralleldrive/cuid2";
+import archiver from "archiver";
+import { eq } from "drizzle-orm";
+import { workerStatsCounter } from "metrics";
+import cron from "node-cron";
+
+import type { ZBackupRequest } from "@karakeep/shared-server";
+import { db } from "@karakeep/db";
+import { assets, AssetTypes, users } from "@karakeep/db/schema";
+import { BackupQueue, QuotaService } from "@karakeep/shared-server";
+import { saveAssetFromFile } from "@karakeep/shared/assetdb";
+import { toExportFormat } from "@karakeep/shared/import-export";
+import logger from "@karakeep/shared/logger";
+import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
+import { AuthedContext } from "@karakeep/trpc";
+import { Backup } from "@karakeep/trpc/models/backups";
+
+import { buildImpersonatingAuthedContext } from "../trpc";
+import { fetchBookmarksInBatches } from "./utils/fetchBookmarks";
+
+// Run daily at midnight UTC
+export const BackupSchedulingWorker = cron.schedule(
+ "0 0 * * *",
+ async () => {
+ logger.info("[backup] Scheduling daily backup jobs ...");
+ try {
+ const usersWithBackups = await db.query.users.findMany({
+ columns: {
+ id: true,
+ backupsFrequency: true,
+ },
+ where: eq(users.backupsEnabled, true),
+ });
+
+ logger.info(
+ `[backup] Found ${usersWithBackups.length} users with backups enabled`,
+ );
+
+ const now = new Date();
+ const currentDay = now.toISOString().split("T")[0]; // YYYY-MM-DD
+
+ for (const user of usersWithBackups) {
+ // Deterministically schedule backups throughout the day based on user ID
+ // This spreads the load across 24 hours
+ const hash = createHash("sha256").update(user.id).digest("hex");
+ const hashNum = parseInt(hash.substring(0, 8), 16);
+
+ // For daily: schedule within 24 hours
+ // For weekly: only schedule on the user's designated day of week
+ let shouldSchedule = false;
+ let delayMs = 0;
+
+ if (user.backupsFrequency === "daily") {
+ shouldSchedule = true;
+ // Spread across 24 hours (86400000 ms)
+ delayMs = hashNum % 86400000;
+ } else if (user.backupsFrequency === "weekly") {
+ // Use hash to determine day of week (0-6)
+ const userDayOfWeek = hashNum % 7;
+ const currentDayOfWeek = now.getDay();
+
+ if (userDayOfWeek === currentDayOfWeek) {
+ shouldSchedule = true;
+ // Spread across 24 hours
+ delayMs = hashNum % 86400000;
+ }
+ }
+
+ if (shouldSchedule) {
+ const idempotencyKey = `${user.id}-${currentDay}`;
+
+ await BackupQueue.enqueue(
+ {
+ userId: user.id,
+ },
+ {
+ delayMs,
+ idempotencyKey,
+ },
+ );
+
+ logger.info(
+ `[backup] Scheduled backup for user ${user.id} with delay ${Math.round(delayMs / 1000 / 60)} minutes`,
+ );
+ }
+ }
+
+ logger.info("[backup] Finished scheduling backup jobs");
+ } catch (error) {
+ logger.error(`[backup] Error scheduling backup jobs: ${error}`);
+ }
+ },
+ {
+ runOnInit: false,
+ scheduled: false,
+ },
+);
+
+export class BackupWorker {
+ static async build() {
+ logger.info("Starting backup worker ...");
+ const worker = (await getQueueClient())!.createRunner<ZBackupRequest>(
+ BackupQueue,
+ {
+ run: run,
+ onComplete: async (job) => {
+ workerStatsCounter.labels("backup", "completed").inc();
+ const jobId = job.id;
+ logger.info(`[backup][${jobId}] Completed successfully`);
+ },
+ onError: async (job) => {
+ workerStatsCounter.labels("backup", "failed").inc();
+ if (job.numRetriesLeft == 0) {
+ workerStatsCounter.labels("backup", "failed_permanent").inc();
+ }
+ const jobId = job.id;
+ logger.error(
+ `[backup][${jobId}] Backup job failed: ${job.error}\n${job.error?.stack}`,
+ );
+
+ // Mark backup as failed
+ if (job.data?.backupId && job.data?.userId) {
+ try {
+ const authCtx = await buildImpersonatingAuthedContext(
+ job.data.userId,
+ );
+ const backup = await Backup.fromId(authCtx, job.data.backupId);
+ await backup.update({
+ status: "failure",
+ errorMessage: job.error?.message || "Unknown error",
+ });
+ } catch (err) {
+ logger.error(
+ `[backup][${jobId}] Failed to mark backup as failed: ${err}`,
+ );
+ }
+ }
+ },
+ },
+ {
+ concurrency: 2, // Process 2 backups at a time
+ pollIntervalMs: 5000,
+ timeoutSecs: 600, // 10 minutes timeout for large exports
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function run(req: DequeuedJob<ZBackupRequest>) {
+ const jobId = req.id;
+ const userId = req.data.userId;
+ const backupId = req.data.backupId;
+
+ logger.info(`[backup][${jobId}] Starting backup for user ${userId} ...`);
+
+ // Fetch user settings to check if backups are enabled and get retention
+ const user = await db.query.users.findFirst({
+ columns: {
+ id: true,
+ backupsRetentionDays: true,
+ },
+ where: eq(users.id, userId),
+ });
+
+ if (!user) {
+ logger.info(`[backup][${jobId}] User not found: ${userId}. Skipping.`);
+ return;
+ }
+
+ const timestamp = new Date().toISOString().replace(/[:.]/g, "-");
+ const tempJsonPath = join(
+ tmpdir(),
+ `karakeep-backup-${userId}-${timestamp}.json`,
+ );
+ const tempZipPath = join(
+ tmpdir(),
+ `karakeep-backup-${userId}-${timestamp}.zip`,
+ );
+
+ let backup: Backup | null = null;
+
+ try {
+ // Step 1: Stream bookmarks to JSON file
+ const ctx = await buildImpersonatingAuthedContext(userId);
+ const backupInstance = await (backupId
+ ? Backup.fromId(ctx, backupId)
+ : Backup.create(ctx));
+ backup = backupInstance;
+ // Ensure backupId is attached to job data so error handler can mark failure.
+ req.data.backupId = backupInstance.id;
+
+ const bookmarkCount = await streamBookmarksToJsonFile(
+ ctx,
+ tempJsonPath,
+ jobId,
+ );
+
+ logger.info(
+ `[backup][${jobId}] Streamed ${bookmarkCount} bookmarks to JSON file`,
+ );
+
+ // Step 2: Compress the JSON file as zip
+ logger.info(`[backup][${jobId}] Compressing JSON file as zip ...`);
+ await createZipArchiveFromFile(tempJsonPath, timestamp, tempZipPath);
+
+ const fileStats = await stat(tempZipPath);
+ const compressedSize = fileStats.size;
+ const jsonStats = await stat(tempJsonPath);
+
+ logger.info(
+ `[backup][${jobId}] Compressed ${jsonStats.size} bytes to ${compressedSize} bytes`,
+ );
+
+ // Step 3: Check quota and store as asset
+ const quotaApproval = await QuotaService.checkStorageQuota(
+ db,
+ userId,
+ compressedSize,
+ );
+ const assetId = createId();
+ const fileName = `karakeep-backup-${timestamp}.zip`;
+
+ // Step 4: Create asset record
+ await db.insert(assets).values({
+ id: assetId,
+ assetType: AssetTypes.BACKUP,
+ size: compressedSize,
+ contentType: "application/zip",
+ fileName: fileName,
+ bookmarkId: null,
+ userId: userId,
+ });
+ await saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath: tempZipPath,
+ metadata: {
+ contentType: "application/zip",
+ fileName,
+ },
+ quotaApproved: quotaApproval,
+ });
+
+ // Step 5: Update backup record
+ await backupInstance.update({
+ size: compressedSize,
+ bookmarkCount: bookmarkCount,
+ status: "success",
+ assetId,
+ });
+
+ logger.info(
+ `[backup][${jobId}] Successfully created backup for user ${userId} with ${bookmarkCount} bookmarks (${compressedSize} bytes)`,
+ );
+
+ // Step 6: Clean up old backups based on retention
+ await cleanupOldBackups(ctx, user.backupsRetentionDays, jobId);
+ } catch (error) {
+ if (backup) {
+ try {
+ await backup.update({
+ status: "failure",
+ errorMessage:
+ error instanceof Error ? error.message : "Unknown error",
+ });
+ } catch (updateError) {
+ logger.error(
+ `[backup][${jobId}] Failed to mark backup ${backup.id} as failed: ${updateError}`,
+ );
+ }
+ }
+ throw error;
+ } finally {
+ // Final cleanup of temporary files
+ try {
+ await unlink(tempJsonPath);
+ } catch {
+ // Ignore errors during cleanup
+ }
+ try {
+ await unlink(tempZipPath);
+ } catch {
+ // Ignore errors during cleanup
+ }
+ }
+}
+
+/**
+ * Streams bookmarks to a JSON file in batches to avoid loading everything into memory
+ * @returns The total number of bookmarks written
+ */
+async function streamBookmarksToJsonFile(
+ ctx: AuthedContext,
+ outputPath: string,
+ jobId: string,
+): Promise<number> {
+ return new Promise((resolve, reject) => {
+ const writeStream = createWriteStream(outputPath, { encoding: "utf-8" });
+ let bookmarkCount = 0;
+ let isFirst = true;
+
+ writeStream.on("error", reject);
+
+ // Start JSON structure
+ writeStream.write('{"bookmarks":[');
+
+ (async () => {
+ try {
+ for await (const batch of fetchBookmarksInBatches(ctx, 1000)) {
+ for (const bookmark of batch) {
+ const exported = toExportFormat(bookmark);
+ if (exported.content !== null) {
+ // Add comma separator for all items except the first
+ if (!isFirst) {
+ writeStream.write(",");
+ }
+ writeStream.write(JSON.stringify(exported));
+ isFirst = false;
+ bookmarkCount++;
+ }
+ }
+
+ // Log progress every batch
+ if (bookmarkCount % 1000 === 0) {
+ logger.info(
+ `[backup][${jobId}] Streamed ${bookmarkCount} bookmarks so far...`,
+ );
+ }
+ }
+
+ // Close JSON structure
+ writeStream.write("]}");
+ writeStream.end();
+
+ writeStream.on("finish", () => {
+ resolve(bookmarkCount);
+ });
+ } catch (error) {
+ writeStream.destroy();
+ reject(error);
+ }
+ })();
+ });
+}
+
+/**
+ * Creates a zip archive from a JSON file (streaming from disk instead of memory)
+ */
+async function createZipArchiveFromFile(
+ jsonFilePath: string,
+ timestamp: string,
+ outputPath: string,
+): Promise<void> {
+ return new Promise((resolve, reject) => {
+ const archive = archiver("zip", {
+ zlib: { level: 9 }, // Maximum compression
+ });
+
+ const output = createWriteStream(outputPath);
+
+ output.on("close", () => {
+ resolve();
+ });
+
+ output.on("error", reject);
+ archive.on("error", reject);
+
+ // Pipe archive data to the file
+ archive.pipe(output);
+
+ // Add the JSON file to the zip (streaming from disk)
+ const jsonFileName = `karakeep-backup-${timestamp}.json`;
+ archive.file(jsonFilePath, { name: jsonFileName });
+
+ archive.finalize();
+ });
+}
+
+/**
+ * Cleans up old backups based on retention policy
+ */
+async function cleanupOldBackups(
+ ctx: AuthedContext,
+ retentionDays: number,
+ jobId: string,
+) {
+ try {
+ logger.info(
+ `[backup][${jobId}] Cleaning up backups older than ${retentionDays} days for user ${ctx.user.id} ...`,
+ );
+
+ const oldBackups = await Backup.findOldBackups(ctx, retentionDays);
+
+ if (oldBackups.length === 0) {
+ return;
+ }
+
+ logger.info(
+ `[backup][${jobId}] Found ${oldBackups.length} old backups to delete for user ${ctx.user.id}`,
+ );
+
+ // Delete each backup using the model's delete method
+ for (const backup of oldBackups) {
+ try {
+ await backup.delete();
+ logger.info(
+ `[backup][${jobId}] Deleted backup ${backup.id} for user ${ctx.user.id}`,
+ );
+ } catch (error) {
+ logger.warn(
+ `[backup][${jobId}] Failed to delete backup ${backup.id}: ${error}`,
+ );
+ }
+ }
+
+ logger.info(
+ `[backup][${jobId}] Successfully cleaned up ${oldBackups.length} old backups for user ${ctx.user.id}`,
+ );
+ } catch (error) {
+ logger.error(
+ `[backup][${jobId}] Error cleaning up old backups for user ${ctx.user.id}: ${error}`,
+ );
+ }
+}
diff --git a/apps/workers/workers/utils/fetchBookmarks.ts b/apps/workers/workers/utils/fetchBookmarks.ts
new file mode 100644
index 00000000..0f357996
--- /dev/null
+++ b/apps/workers/workers/utils/fetchBookmarks.ts
@@ -0,0 +1,131 @@
+import { asc, eq } from "drizzle-orm";
+
+import type { ZBookmark } from "@karakeep/shared/types/bookmarks";
+import type { ZCursor } from "@karakeep/shared/types/pagination";
+import type { AuthedContext } from "@karakeep/trpc";
+import { db } from "@karakeep/db";
+import { bookmarks } from "@karakeep/db/schema";
+import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+import { Bookmark } from "@karakeep/trpc/models/bookmarks";
+
+/**
+ * Fetches all bookmarks for a user with all necessary relations for export
+ * @deprecated Use fetchBookmarksInBatches for memory-efficient iteration
+ */
+export async function fetchAllBookmarksForUser(
+ dbInstance: typeof db,
+ userId: string,
+): Promise<ZBookmark[]> {
+ const allBookmarks = await dbInstance.query.bookmarks.findMany({
+ where: eq(bookmarks.userId, userId),
+ with: {
+ tagsOnBookmarks: {
+ with: {
+ tag: true,
+ },
+ },
+ link: true,
+ text: true,
+ asset: true,
+ assets: true,
+ },
+ orderBy: [asc(bookmarks.createdAt)],
+ });
+
+ // Transform to ZBookmark format
+ return allBookmarks.map((bookmark) => {
+ let content: ZBookmark["content"] | null = null;
+
+ switch (bookmark.type) {
+ case BookmarkTypes.LINK:
+ if (bookmark.link) {
+ content = {
+ type: BookmarkTypes.LINK,
+ url: bookmark.link.url,
+ title: bookmark.link.title || undefined,
+ description: bookmark.link.description || undefined,
+ imageUrl: bookmark.link.imageUrl || undefined,
+ favicon: bookmark.link.favicon || undefined,
+ };
+ }
+ break;
+ case BookmarkTypes.TEXT:
+ if (bookmark.text) {
+ content = {
+ type: BookmarkTypes.TEXT,
+ text: bookmark.text.text || "",
+ };
+ }
+ break;
+ case BookmarkTypes.ASSET:
+ if (bookmark.asset) {
+ content = {
+ type: BookmarkTypes.ASSET,
+ assetType: bookmark.asset.assetType,
+ assetId: bookmark.asset.assetId,
+ };
+ }
+ break;
+ }
+
+ return {
+ id: bookmark.id,
+ title: bookmark.title || null,
+ createdAt: bookmark.createdAt,
+ archived: bookmark.archived,
+ favourited: bookmark.favourited,
+ taggingStatus: bookmark.taggingStatus || "pending",
+ note: bookmark.note || null,
+ summary: bookmark.summary || null,
+ content,
+ tags: bookmark.tagsOnBookmarks.map((t) => ({
+ id: t.tag.id,
+ name: t.tag.name,
+ attachedBy: t.attachedBy,
+ })),
+ assets: bookmark.assets.map((a) => ({
+ id: a.id,
+ assetType: a.assetType,
+ })),
+ } as ZBookmark;
+ });
+}
+
+/**
+ * Fetches bookmarks in batches using cursor-based pagination from the Bookmark model
+ * This is memory-efficient for large datasets as it only loads one batch at a time
+ */
+export async function* fetchBookmarksInBatches(
+ ctx: AuthedContext,
+ batchSize = 1000,
+): AsyncGenerator<ZBookmark[], number, undefined> {
+ let cursor: ZCursor | null = null;
+ let totalFetched = 0;
+
+ while (true) {
+ const result = await Bookmark.loadMulti(ctx, {
+ limit: batchSize,
+ cursor: cursor,
+ sortOrder: "asc",
+ includeContent: false, // We don't need full content for export
+ });
+
+ if (result.bookmarks.length === 0) {
+ break;
+ }
+
+ // Convert Bookmark instances to ZBookmark
+ const batch = result.bookmarks.map((b) => b.asZBookmark());
+ yield batch;
+
+ totalFetched += batch.length;
+ cursor = result.nextCursor;
+
+ // If there's no next cursor, we've reached the end
+ if (!cursor) {
+ break;
+ }
+ }
+
+ return totalFetched;
+}