diff options
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/constants.ts | 1 | ||||
| -rw-r--r-- | apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts | 188 | ||||
| -rw-r--r-- | apps/workers/workers/adminMaintenanceWorker.ts | 12 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 3 |
4 files changed, 200 insertions, 4 deletions
diff --git a/apps/workers/constants.ts b/apps/workers/constants.ts new file mode 100644 index 00000000..954466bf --- /dev/null +++ b/apps/workers/constants.ts @@ -0,0 +1 @@ +export const HTML_CONTENT_SIZE_THRESHOLD = 50 * 1024; // 50KB diff --git a/apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts b/apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts new file mode 100644 index 00000000..c5336bce --- /dev/null +++ b/apps/workers/workers/adminMaintenance/tasks/migrateLinkHtmlContent.ts @@ -0,0 +1,188 @@ +import { and, asc, eq, gt, isNotNull, isNull, sql } from "drizzle-orm"; + +import type { ZAdminMaintenanceMigrateLargeLinkHtmlTask } from "@karakeep/shared-server"; +import type { DequeuedJob } from "@karakeep/shared/queueing"; +import { db } from "@karakeep/db"; +import { AssetTypes, bookmarkLinks, bookmarks } from "@karakeep/db/schema"; +import { QuotaService } from "@karakeep/shared-server"; +import { + ASSET_TYPES, + deleteAsset, + newAssetId, + saveAsset, +} from "@karakeep/shared/assetdb"; +import logger from "@karakeep/shared/logger"; +import { tryCatch } from "@karakeep/shared/tryCatch"; + +import { HTML_CONTENT_SIZE_THRESHOLD } from "../../../constants"; +import { updateAsset } from "../../../workerUtils"; + +const BATCH_SIZE = 25; + +interface BookmarkHtmlRow { + bookmarkId: string; + userId: string; + htmlContent: string; +} + +async function getBookmarksWithLargeInlineHtml(limit: number, cursor?: string) { + const rows = await db + .select({ + bookmarkId: bookmarkLinks.id, + userId: bookmarks.userId, + htmlContent: bookmarkLinks.htmlContent, + }) + .from(bookmarkLinks) + .innerJoin(bookmarks, eq(bookmarkLinks.id, bookmarks.id)) + .where( + cursor + ? and( + gt(bookmarkLinks.id, cursor), + isNotNull(bookmarkLinks.htmlContent), + isNull(bookmarkLinks.contentAssetId), + sql`length(CAST(${bookmarkLinks.htmlContent} AS BLOB)) > ${HTML_CONTENT_SIZE_THRESHOLD}`, + ) + : and( + isNotNull(bookmarkLinks.htmlContent), + isNull(bookmarkLinks.contentAssetId), + sql`length(CAST(${bookmarkLinks.htmlContent} AS BLOB)) > ${HTML_CONTENT_SIZE_THRESHOLD}`, + ), + ) + .orderBy(asc(bookmarkLinks.id)) + .limit(limit); + + return rows.filter((row): row is BookmarkHtmlRow => row.htmlContent !== null); +} + +async function migrateBookmarkHtml( + bookmark: BookmarkHtmlRow, + jobId: string, +): Promise<boolean> { + const { bookmarkId, userId, htmlContent } = bookmark; + + const contentSize = Buffer.byteLength(htmlContent, "utf8"); + + if (contentSize <= HTML_CONTENT_SIZE_THRESHOLD) { + logger.debug( + `[adminMaintenance:migrate_large_link_html][${jobId}] Bookmark ${bookmarkId} inline HTML (${contentSize} bytes) below threshold, skipping`, + ); + return false; + } + + const { data: quotaApproved, error: quotaError } = await tryCatch( + QuotaService.checkStorageQuota(db, userId, contentSize), + ); + + if (quotaError || !quotaApproved) { + logger.warn( + `[adminMaintenance:migrate_large_link_html][${jobId}] Skipping bookmark ${bookmarkId} due to storage quota error: ${quotaError?.message}`, + ); + return false; + } + + const contentBuffer = Buffer.from(htmlContent, "utf8"); + const assetId = newAssetId(); + const { error: saveError } = await tryCatch( + saveAsset({ + userId, + assetId, + asset: contentBuffer, + metadata: { contentType: ASSET_TYPES.TEXT_HTML, fileName: null }, + quotaApproved, + }), + ); + + if (saveError) { + logger.error( + `[adminMaintenance:migrate_large_link_html][${jobId}] Failed to persist HTML for bookmark ${bookmarkId} as asset: ${saveError}`, + ); + await deleteAsset({ userId, assetId }).catch(() => { + /* ignore */ + }); + return false; + } + + try { + await db.transaction(async (txn) => { + const res = await txn + .update(bookmarkLinks) + .set({ htmlContent: null, contentAssetId: assetId }) + .where( + and( + eq(bookmarkLinks.id, bookmarkId), + isNull(bookmarkLinks.contentAssetId), + ), + ); + + if (res.changes === 0) { + throw new Error("Failed to update bookmark"); + } + + await updateAsset( + undefined, + { + id: assetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_HTML_CONTENT, + contentType: ASSET_TYPES.TEXT_HTML, + size: contentSize, + fileName: null, + }, + txn, + ); + }); + } catch (error) { + await deleteAsset({ userId, assetId }).catch(() => { + /* ignore */ + }); + logger.error( + `[adminMaintenance:migrate_large_link_html][${jobId}] Failed to update bookmark ${bookmarkId} after storing asset: ${error}`, + ); + return false; + } + + logger.info( + `[adminMaintenance:migrate_large_link_html][${jobId}] Migrated inline HTML (${contentSize} bytes) for bookmark ${bookmarkId} to asset ${assetId}`, + ); + + return true; +} + +export async function runMigrateLargeLinkHtmlTask( + job: DequeuedJob<ZAdminMaintenanceMigrateLargeLinkHtmlTask>, +): Promise<void> { + const jobId = job.id; + let migratedCount = 0; + let cursor: string | undefined; + + while (true) { + const bookmarksToMigrate = await getBookmarksWithLargeInlineHtml( + BATCH_SIZE, + cursor, + ); + + if (bookmarksToMigrate.length === 0) { + break; + } + + for (const bookmark of bookmarksToMigrate) { + try { + const migrated = await migrateBookmarkHtml(bookmark, jobId); + if (migrated) { + migratedCount += 1; + } + } catch (error) { + logger.error( + `[adminMaintenance:migrate_large_link_html][${jobId}] Unexpected error migrating bookmark ${bookmark.bookmarkId}: ${error}`, + ); + } + } + + cursor = bookmarksToMigrate[bookmarksToMigrate.length - 1]?.bookmarkId; + } + + logger.info( + `[adminMaintenance:migrate_large_link_html][${jobId}] Completed migration. Total bookmarks migrated: ${migratedCount}`, + ); +} diff --git a/apps/workers/workers/adminMaintenanceWorker.ts b/apps/workers/workers/adminMaintenanceWorker.ts index f8af3de0..af6aed22 100644 --- a/apps/workers/workers/adminMaintenanceWorker.ts +++ b/apps/workers/workers/adminMaintenanceWorker.ts @@ -2,6 +2,7 @@ import { workerStatsCounter } from "metrics"; import { AdminMaintenanceQueue, + ZAdminMaintenanceMigrateLargeLinkHtmlTask, ZAdminMaintenanceTask, zAdminMaintenanceTaskSchema, ZAdminMaintenanceTidyAssetsTask, @@ -9,6 +10,7 @@ import { import logger from "@karakeep/shared/logger"; import { DequeuedJob, getQueueClient } from "@karakeep/shared/queueing"; +import { runMigrateLargeLinkHtmlTask } from "./adminMaintenance/tasks/migrateLinkHtmlContent"; import { runTidyAssetsTask } from "./adminMaintenance/tasks/tidyAssets"; export class AdminMaintenanceWorker { @@ -66,9 +68,15 @@ async function runAdminMaintenance(job: DequeuedJob<ZAdminMaintenanceTask>) { job as DequeuedJob<ZAdminMaintenanceTidyAssetsTask>, task, ); - default: + case "migrate_large_link_html": + return runMigrateLargeLinkHtmlTask( + job as DequeuedJob<ZAdminMaintenanceMigrateLargeLinkHtmlTask>, + ); + default: { + const exhaustiveCheck: never = task; throw new Error( - `[adminMaintenance][${jobId}] No handler registered for task ${task.type}`, + `[adminMaintenance][${jobId}] No handler registered for task ${(exhaustiveCheck as ZAdminMaintenanceTask).type}`, ); + } } } diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index e9024723..4e02d73a 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -73,6 +73,7 @@ import { import { tryCatch } from "@karakeep/shared/tryCatch"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; +import { HTML_CONTENT_SIZE_THRESHOLD } from "../constants"; import metascraperReddit from "../metascraper-plugins/metascraper-reddit"; function abortPromise(signal: AbortSignal): Promise<never> { @@ -915,8 +916,6 @@ async function handleAsAssetBookmark( }); } -const HTML_CONTENT_SIZE_THRESHOLD = 50 * 1024; // 50KB - type StoreHtmlResult = | { result: "stored"; assetId: string; size: number } | { result: "store_inline" } |
