aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-10-26 11:03:44 +0000
committerGitHub <noreply@github.com>2025-10-26 11:03:44 +0000
commit2b769cba822506c1572793385993737d4ffff478 (patch)
tree0e2380628c3a8691779fef6ef77e4a55f211ac51 /apps/workers
parent17136006c77b5893ad112af6475b2355d346996e (diff)
downloadkarakeep-2b769cba822506c1572793385993737d4ffff478.tar.zst
feat: Add admin maintenance job to migrate large inline HTML (#2071)
* Add admin maintenance job to migrate large inline HTML * add cursor * more fixes
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/constants.ts1
-rw-r--r--apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts188
-rw-r--r--apps/workers/workers/adminMaintenanceWorker.ts12
-rw-r--r--apps/workers/workers/crawlerWorker.ts3
4 files changed, 200 insertions, 4 deletions
diff --git a/apps/workers/constants.ts b/apps/workers/constants.ts
new file mode 100644
index 00000000..954466bf
--- /dev/null
+++ b/apps/workers/constants.ts
@@ -0,0 +1 @@
+export const HTML_CONTENT_SIZE_THRESHOLD = 50 * 1024; // 50KB
diff --git a/apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts b/apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts
new file mode 100644
index 00000000..c5336bce
--- /dev/null
+++ b/apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts
@@ -0,0 +1,188 @@
+import { and, asc, eq, gt, isNotNull, isNull, sql } from "drizzle-orm";
+
+import type { ZAdminMaintenanceMigrateLargeLinkHtmlTask } from "@karakeep/shared-server";
+import type { DequeuedJob } from "@karakeep/shared/queueing";
+import { db } from "@karakeep/db";
+import { AssetTypes, bookmarkLinks, bookmarks } from "@karakeep/db/schema";
+import { QuotaService } from "@karakeep/shared-server";
+import {
+ ASSET_TYPES,
+ deleteAsset,
+ newAssetId,
+ saveAsset,
+} from "@karakeep/shared/assetdb";
+import logger from "@karakeep/shared/logger";
+import { tryCatch } from "@karakeep/shared/tryCatch";
+
+import { HTML_CONTENT_SIZE_THRESHOLD } from "../../../constants";
+import { updateAsset } from "../../../workerUtils";
+
+const BATCH_SIZE = 25;
+
+interface BookmarkHtmlRow {
+ bookmarkId: string;
+ userId: string;
+ htmlContent: string;
+}
+
+async function getBookmarksWithLargeInlineHtml(limit: number, cursor?: string) {
+ const rows = await db
+ .select({
+ bookmarkId: bookmarkLinks.id,
+ userId: bookmarks.userId,
+ htmlContent: bookmarkLinks.htmlContent,
+ })
+ .from(bookmarkLinks)
+ .innerJoin(bookmarks, eq(bookmarkLinks.id, bookmarks.id))
+ .where(
+ cursor
+ ? and(
+ gt(bookmarkLinks.id, cursor),
+ isNotNull(bookmarkLinks.htmlContent),
+ isNull(bookmarkLinks.contentAssetId),
+ sql`length(CAST(${bookmarkLinks.htmlContent} AS BLOB)) > ${HTML_CONTENT_SIZE_THRESHOLD}`,
+ )
+ : and(
+ isNotNull(bookmarkLinks.htmlContent),
+ isNull(bookmarkLinks.contentAssetId),
+ sql`length(CAST(${bookmarkLinks.htmlContent} AS BLOB)) > ${HTML_CONTENT_SIZE_THRESHOLD}`,
+ ),
+ )
+ .orderBy(asc(bookmarkLinks.id))
+ .limit(limit);
+
+ return rows.filter((row): row is BookmarkHtmlRow => row.htmlContent !== null);
+}
+
+async function migrateBookmarkHtml(
+ bookmark: BookmarkHtmlRow,
+ jobId: string,
+): Promise<boolean> {
+ const { bookmarkId, userId, htmlContent } = bookmark;
+
+ const contentSize = Buffer.byteLength(htmlContent, "utf8");
+
+ if (contentSize <= HTML_CONTENT_SIZE_THRESHOLD) {
+ logger.debug(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Bookmark ${bookmarkId} inline HTML (${contentSize} bytes) below threshold, skipping`,
+ );
+ return false;
+ }
+
+ const { data: quotaApproved, error: quotaError } = await tryCatch(
+ QuotaService.checkStorageQuota(db, userId, contentSize),
+ );
+
+ if (quotaError || !quotaApproved) {
+ logger.warn(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Skipping bookmark ${bookmarkId} due to storage quota error: ${quotaError?.message}`,
+ );
+ return false;
+ }
+
+ const contentBuffer = Buffer.from(htmlContent, "utf8");
+ const assetId = newAssetId();
+ const { error: saveError } = await tryCatch(
+ saveAsset({
+ userId,
+ assetId,
+ asset: contentBuffer,
+ metadata: { contentType: ASSET_TYPES.TEXT_HTML, fileName: null },
+ quotaApproved,
+ }),
+ );
+
+ if (saveError) {
+ logger.error(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Failed to persist HTML for bookmark ${bookmarkId} as asset: ${saveError}`,
+ );
+ await deleteAsset({ userId, assetId }).catch(() => {
+ /* ignore */
+ });
+ return false;
+ }
+
+ try {
+ await db.transaction(async (txn) => {
+ const res = await txn
+ .update(bookmarkLinks)
+ .set({ htmlContent: null, contentAssetId: assetId })
+ .where(
+ and(
+ eq(bookmarkLinks.id, bookmarkId),
+ isNull(bookmarkLinks.contentAssetId),
+ ),
+ );
+
+ if (res.changes === 0) {
+ throw new Error("Failed to update bookmark");
+ }
+
+ await updateAsset(
+ undefined,
+ {
+ id: assetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_HTML_CONTENT,
+ contentType: ASSET_TYPES.TEXT_HTML,
+ size: contentSize,
+ fileName: null,
+ },
+ txn,
+ );
+ });
+ } catch (error) {
+ await deleteAsset({ userId, assetId }).catch(() => {
+ /* ignore */
+ });
+ logger.error(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Failed to update bookmark ${bookmarkId} after storing asset: ${error}`,
+ );
+ return false;
+ }
+
+ logger.info(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Migrated inline HTML (${contentSize} bytes) for bookmark ${bookmarkId} to asset ${assetId}`,
+ );
+
+ return true;
+}
+
+export async function runMigrateLargeLinkHtmlTask(
+ job: DequeuedJob<ZAdminMaintenanceMigrateLargeLinkHtmlTask>,
+): Promise<void> {
+ const jobId = job.id;
+ let migratedCount = 0;
+ let cursor: string | undefined;
+
+ while (true) {
+ const bookmarksToMigrate = await getBookmarksWithLargeInlineHtml(
+ BATCH_SIZE,
+ cursor,
+ );
+
+ if (bookmarksToMigrate.length === 0) {
+ break;
+ }
+
+ for (const bookmark of bookmarksToMigrate) {
+ try {
+ const migrated = await migrateBookmarkHtml(bookmark, jobId);
+ if (migrated) {
+ migratedCount += 1;
+ }
+ } catch (error) {
+ logger.error(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Unexpected error migrating bookmark ${bookmark.bookmarkId}: ${error}`,
+ );
+ }
+ }
+
+ cursor = bookmarksToMigrate[bookmarksToMigrate.length - 1]?.bookmarkId;
+ }
+
+ logger.info(
+ `[adminMaintenance:migrate_large_link_html][${jobId}] Completed migration. Total bookmarks migrated: ${migratedCount}`,
+ );
+}
diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts
index f8af3de0..af6aed22 100644
--- a/apps/workers/workers/adminMaintenanceWorker.ts
+++ b/apps/workers/workers/adminMaintenanceWorker.ts
@@ -2,6 +2,7 @@ import { workerStatsCounter } from "metrics";
import {
AdminMaintenanceQueue,
+ ZAdminMaintenanceMigrateLargeLinkHtmlTask,
ZAdminMaintenanceTask,
zAdminMaintenanceTaskSchema,
ZAdminMaintenanceTidyAssetsTask,
@@ -9,6 +10,7 @@ import {
import logger from "@karakeep/shared/logger";
import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing";
+import { runMigrateLargeLinkHtmlTask } from "./adminMaintenance/tasks/migrateLinkHtmlContent";
import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets";
export class AdminMaintenanceWorker {
@@ -66,9 +68,15 @@ async function runAdminMaintenance(job: DequeuedJob<ZAdminMaintenanceTask>) {
job as DequeuedJob<ZAdminMaintenanceTidyAssetsTask>,
task,
);
- default:
+ case "migrate_large_link_html":
+ return runMigrateLargeLinkHtmlTask(
+ job as DequeuedJob<ZAdminMaintenanceMigrateLargeLinkHtmlTask>,
+ );
+ default: {
+ const exhaustiveCheck: never = task;
throw new Error(
- `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`,
+ `[adminMaintenance][${jobId}] No handler registered for task ${(exhaustiveCheck as ZAdminMaintenanceTask).type}`,
);
+ }
}
}
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index e9024723..4e02d73a 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -73,6 +73,7 @@ import {
import { tryCatch } from "@karakeep/shared/tryCatch";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+import { HTML_CONTENT_SIZE_THRESHOLD } from "../constants";
import metascraperReddit from "../metascraper-plugins/metascraper-reddit";
function abortPromise(signal: AbortSignal): Promise<never> {
@@ -915,8 +916,6 @@ async function handleAsAssetBookmark(
});
}
-const HTML_CONTENT_SIZE_THRESHOLD = 50 * 1024; // 50KB
-
type StoreHtmlResult =
| { result: "stored"; assetId: string; size: number }
| { result: "store_inline" }