From 384432d31e7bee6bf35d8af6b7165410303ffda4 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 6 Jul 2025 15:54:49 +0000 Subject: feat: Add per user storage quota --- apps/workers/workerUtils.ts | 1 - apps/workers/workers/assetPreprocessingWorker.ts | 18 +++ apps/workers/workers/crawlerWorker.ts | 165 ++++++++++++++++------- apps/workers/workers/videoWorker.ts | 74 ++++++---- 4 files changed, 183 insertions(+), 75 deletions(-) (limited to 'apps/workers') diff --git a/apps/workers/workerUtils.ts b/apps/workers/workerUtils.ts index 44180951..d41df578 100644 --- a/apps/workers/workerUtils.ts +++ b/apps/workers/workerUtils.ts @@ -4,7 +4,6 @@ import { db, KarakeepDBTransaction } from "@karakeep/db"; import { assets, AssetTypes, bookmarks } from "@karakeep/db/schema"; type DBAssetType = typeof assets.$inferInsert; - export async function updateAsset( oldAssetId: string | undefined, newAsset: DBAssetType, diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index d43163ac..8231ce50 100644 --- a/apps/workers/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -21,6 +21,10 @@ import { OpenAIQueue, triggerSearchReindex, } from "@karakeep/shared/queues"; +import { + checkStorageQuota, + StorageQuotaError, +} from "@karakeep/trpc/lib/storageQuota"; export class AssetPreprocessingWorker { static build() { @@ -128,6 +132,13 @@ export async function extractAndSavePDFScreenshot( return false; } + // Check storage quota before inserting + const quotaApproved = await checkStorageQuota( + db, + bookmark.userId, + screenshot.buffer.byteLength, + ); + // Store the screenshot const assetId = newAssetId(); const fileName = "screenshot.png"; @@ -140,6 +151,7 @@ export async function extractAndSavePDFScreenshot( contentType, fileName, }, + quotaApproved, }); // Insert into database @@ -158,6 +170,12 @@ export async function extractAndSavePDFScreenshot( ); return true; } catch (error) { + if (error instanceof StorageQuotaError) { + logger.warn( + `[assetPreprocessing][${jobId}] Skipping PDF screenshot due to quota exceeded: ${error.message}`, + ); + return true; // Return true to indicate the job completed successfully, just skipped the asset + } logger.error( `[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`, ); diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 7d92eb51..10106a3b 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -61,6 +61,10 @@ import { zCrawlLinkRequestSchema, } from "@karakeep/shared/queues"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; +import { + checkStorageQuota, + StorageQuotaError, +} from "@karakeep/trpc/lib/storageQuota"; import metascraperReddit from "../metascraper-plugins/metascraper-reddit"; @@ -435,16 +439,35 @@ async function storeScreenshot( const assetId = newAssetId(); const contentType = "image/png"; const fileName = "screenshot.png"; - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: screenshot, - }); - logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, - ); - return { assetId, contentType, fileName, size: screenshot.byteLength }; + + // Check storage quota before saving the screenshot + try { + const quotaApproved = await checkStorageQuota( + db, + userId, + screenshot.byteLength, + ); + + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: screenshot, + quotaApproved, + }); + logger.info( + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, + ); + return { assetId, contentType, fileName, size: screenshot.byteLength }; + } catch (error) { + if (error instanceof StorageQuotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${error.message}`, + ); + return null; + } + throw error; + } } async function downloadAndStoreFile( @@ -470,11 +493,19 @@ async function downloadAndStoreFile( throw new Error("No content type in the response"); } + // Check storage quota before saving the asset + const quotaApproved = await checkStorageQuota( + db, + userId, + buffer.byteLength, + ); + await saveAsset({ userId, assetId, metadata: { contentType }, asset: Buffer.from(buffer), + quotaApproved, }); logger.info( @@ -483,6 +514,12 @@ async function downloadAndStoreFile( return { assetId, userId, contentType, size: buffer.byteLength }; } catch (e) { + if (e instanceof StorageQuotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${e.message}`, + ); + return null; + } logger.error( `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, ); @@ -523,24 +560,52 @@ async function archiveWebpage( const contentType = "text/html"; - await saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata: { - contentType, - }, - }); + // Get file size and check quota before saving + const stats = await fs.stat(assetPath); + const fileSize = stats.size; - logger.info( - `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, - ); + try { + const quotaApproved = await checkStorageQuota(db, userId, fileSize); - return { - assetId, - contentType, - size: await getAssetSize({ userId, assetId }), - }; + await saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata: { + contentType, + }, + quotaApproved, + }); + + logger.info( + `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + ); + + return { + assetId, + contentType, + size: await getAssetSize({ userId, assetId }), + }; + } catch (error) { + if (error instanceof StorageQuotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${error.message}`, + ); + // Clean up the temporary file + try { + await fs.unlink(assetPath); + logger.info( + `[Crawler][${jobId}] Cleaned up temporary archive file: ${assetPath}`, + ); + } catch (cleanupError) { + logger.warn( + `[Crawler][${jobId}] Failed to clean up temporary archive file: ${cleanupError}`, + ); + } + return null; + } + throw error; + } } async function getContentType( @@ -761,11 +826,7 @@ async function crawlAndParseUrl( !precrawledArchiveAssetId && (serverConfig.crawler.fullPageArchive || archiveFullPage) ) { - const { - assetId: fullPageArchiveAssetId, - size, - contentType, - } = await archiveWebpage( + const archiveResult = await archiveWebpage( htmlContent, browserUrl, userId, @@ -773,23 +834,31 @@ async function crawlAndParseUrl( abortSignal, ); - await db.transaction(async (txn) => { - await updateAsset( - oldFullPageArchiveAssetId, - { - id: fullPageArchiveAssetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, - contentType, - size, - fileName: null, - }, - txn, - ); - }); - if (oldFullPageArchiveAssetId) { - silentDeleteAsset(userId, oldFullPageArchiveAssetId); + if (archiveResult) { + const { + assetId: fullPageArchiveAssetId, + size, + contentType, + } = archiveResult; + + await db.transaction(async (txn) => { + await updateAsset( + oldFullPageArchiveAssetId, + { + id: fullPageArchiveAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE, + contentType, + size, + fileName: null, + }, + txn, + ); + }); + if (oldFullPageArchiveAssetId) { + silentDeleteAsset(userId, oldFullPageArchiveAssetId); + } } } }; diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index ca591e6f..d25c1948 100644 --- a/apps/workers/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -8,7 +8,6 @@ import { db } from "@karakeep/db"; import { AssetTypes } from "@karakeep/db/schema"; import { ASSET_TYPES, - getAssetSize, newAssetId, saveAssetFromFile, silentDeleteAsset, @@ -20,6 +19,10 @@ import { ZVideoRequest, zvideoRequestSchema, } from "@karakeep/shared/queues"; +import { + checkStorageQuota, + StorageQuotaError, +} from "@karakeep/trpc/lib/storageQuota"; import { withTimeout } from "../utils"; import { getBookmarkDetails, updateAsset } from "../workerUtils"; @@ -140,32 +143,51 @@ async function runWorker(job: DequeuedJob) { logger.info( `[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`, ); - await saveAssetFromFile({ - userId, - assetId: videoAssetId, - assetPath, - metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, - }); - - await db.transaction(async (txn) => { - await updateAsset( - oldVideoAssetId, - { - id: videoAssetId, - bookmarkId, - userId, - assetType: AssetTypes.LINK_VIDEO, - contentType: ASSET_TYPES.VIDEO_MP4, - size: await getAssetSize({ userId, assetId: videoAssetId }), - }, - txn, - ); - }); - await silentDeleteAsset(userId, oldVideoAssetId); - logger.info( - `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, - ); + // Get file size and check quota before saving + const stats = await fs.promises.stat(assetPath); + const fileSize = stats.size; + + try { + const quotaApproved = await checkStorageQuota(db, userId, fileSize); + + await saveAssetFromFile({ + userId, + assetId: videoAssetId, + assetPath, + metadata: { contentType: ASSET_TYPES.VIDEO_MP4 }, + quotaApproved, + }); + + await db.transaction(async (txn) => { + await updateAsset( + oldVideoAssetId, + { + id: videoAssetId, + bookmarkId, + userId, + assetType: AssetTypes.LINK_VIDEO, + contentType: ASSET_TYPES.VIDEO_MP4, + size: fileSize, + }, + txn, + ); + }); + await silentDeleteAsset(userId, oldVideoAssetId); + + logger.info( + `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`, + ); + } catch (error) { + if (error instanceof StorageQuotaError) { + logger.warn( + `[VideoCrawler][${jobId}] Skipping video storage due to quota exceeded: ${error.message}`, + ); + await deleteLeftOverAssetFile(jobId, videoAssetId); + return; + } + throw error; + } } /** -- cgit v1.2.3-70-g09d2