aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/workerUtils.ts1
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts18
-rw-r--r--apps/workers/workers/crawlerWorker.ts165
-rw-r--r--apps/workers/workers/videoWorker.ts74
4 files changed, 183 insertions, 75 deletions
diff --git a/apps/workers/workerUtils.ts b/apps/workers/workerUtils.ts
index 44180951..d41df578 100644
--- a/apps/workers/workerUtils.ts
+++ b/apps/workers/workerUtils.ts
@@ -4,7 +4,6 @@ import { db, KarakeepDBTransaction } from "@karakeep/db";
import { assets, AssetTypes, bookmarks } from "@karakeep/db/schema";
type DBAssetType = typeof assets.$inferInsert;
-
export async function updateAsset(
oldAssetId: string | undefined,
newAsset: DBAssetType,
diff --git a/apps/workers/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index d43163ac..8231ce50 100644
--- a/apps/workers/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -21,6 +21,10 @@ import {
OpenAIQueue,
triggerSearchReindex,
} from "@karakeep/shared/queues";
+import {
+ checkStorageQuota,
+ StorageQuotaError,
+} from "@karakeep/trpc/lib/storageQuota";
export class AssetPreprocessingWorker {
static build() {
@@ -128,6 +132,13 @@ export async function extractAndSavePDFScreenshot(
return false;
}
+ // Check storage quota before inserting
+ const quotaApproved = await checkStorageQuota(
+ db,
+ bookmark.userId,
+ screenshot.buffer.byteLength,
+ );
+
// Store the screenshot
const assetId = newAssetId();
const fileName = "screenshot.png";
@@ -140,6 +151,7 @@ export async function extractAndSavePDFScreenshot(
contentType,
fileName,
},
+ quotaApproved,
});
// Insert into database
@@ -158,6 +170,12 @@ export async function extractAndSavePDFScreenshot(
);
return true;
} catch (error) {
+ if (error instanceof StorageQuotaError) {
+ logger.warn(
+ `[assetPreprocessing][${jobId}] Skipping PDF screenshot due to quota exceeded: ${error.message}`,
+ );
+ return true; // Return true to indicate the job completed successfully, just skipped the asset
+ }
logger.error(
`[assetPreprocessing][${jobId}] Failed to process PDF screenshot: ${error}`,
);
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index 7d92eb51..10106a3b 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -61,6 +61,10 @@ import {
zCrawlLinkRequestSchema,
} from "@karakeep/shared/queues";
import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+import {
+ checkStorageQuota,
+ StorageQuotaError,
+} from "@karakeep/trpc/lib/storageQuota";
import metascraperReddit from "../metascraper-plugins/metascraper-reddit";
@@ -435,16 +439,35 @@ async function storeScreenshot(
const assetId = newAssetId();
const contentType = "image/png";
const fileName = "screenshot.png";
- await saveAsset({
- userId,
- assetId,
- metadata: { contentType, fileName },
- asset: screenshot,
- });
- logger.info(
- `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`,
- );
- return { assetId, contentType, fileName, size: screenshot.byteLength };
+
+ // Check storage quota before saving the screenshot
+ try {
+ const quotaApproved = await checkStorageQuota(
+ db,
+ userId,
+ screenshot.byteLength,
+ );
+
+ await saveAsset({
+ userId,
+ assetId,
+ metadata: { contentType, fileName },
+ asset: screenshot,
+ quotaApproved,
+ });
+ logger.info(
+ `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`,
+ );
+ return { assetId, contentType, fileName, size: screenshot.byteLength };
+ } catch (error) {
+ if (error instanceof StorageQuotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${error.message}`,
+ );
+ return null;
+ }
+ throw error;
+ }
}
async function downloadAndStoreFile(
@@ -470,11 +493,19 @@ async function downloadAndStoreFile(
throw new Error("No content type in the response");
}
+ // Check storage quota before saving the asset
+ const quotaApproved = await checkStorageQuota(
+ db,
+ userId,
+ buffer.byteLength,
+ );
+
await saveAsset({
userId,
assetId,
metadata: { contentType },
asset: Buffer.from(buffer),
+ quotaApproved,
});
logger.info(
@@ -483,6 +514,12 @@ async function downloadAndStoreFile(
return { assetId, userId, contentType, size: buffer.byteLength };
} catch (e) {
+ if (e instanceof StorageQuotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${e.message}`,
+ );
+ return null;
+ }
logger.error(
`[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`,
);
@@ -523,24 +560,52 @@ async function archiveWebpage(
const contentType = "text/html";
- await saveAssetFromFile({
- userId,
- assetId,
- assetPath,
- metadata: {
- contentType,
- },
- });
+ // Get file size and check quota before saving
+ const stats = await fs.stat(assetPath);
+ const fileSize = stats.size;
- logger.info(
- `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`,
- );
+ try {
+ const quotaApproved = await checkStorageQuota(db, userId, fileSize);
- return {
- assetId,
- contentType,
- size: await getAssetSize({ userId, assetId }),
- };
+ await saveAssetFromFile({
+ userId,
+ assetId,
+ assetPath,
+ metadata: {
+ contentType,
+ },
+ quotaApproved,
+ });
+
+ logger.info(
+ `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`,
+ );
+
+ return {
+ assetId,
+ contentType,
+ size: await getAssetSize({ userId, assetId }),
+ };
+ } catch (error) {
+ if (error instanceof StorageQuotaError) {
+ logger.warn(
+ `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${error.message}`,
+ );
+ // Clean up the temporary file
+ try {
+ await fs.unlink(assetPath);
+ logger.info(
+ `[Crawler][${jobId}] Cleaned up temporary archive file: ${assetPath}`,
+ );
+ } catch (cleanupError) {
+ logger.warn(
+ `[Crawler][${jobId}] Failed to clean up temporary archive file: ${cleanupError}`,
+ );
+ }
+ return null;
+ }
+ throw error;
+ }
}
async function getContentType(
@@ -761,11 +826,7 @@ async function crawlAndParseUrl(
!precrawledArchiveAssetId &&
(serverConfig.crawler.fullPageArchive || archiveFullPage)
) {
- const {
- assetId: fullPageArchiveAssetId,
- size,
- contentType,
- } = await archiveWebpage(
+ const archiveResult = await archiveWebpage(
htmlContent,
browserUrl,
userId,
@@ -773,23 +834,31 @@ async function crawlAndParseUrl(
abortSignal,
);
- await db.transaction(async (txn) => {
- await updateAsset(
- oldFullPageArchiveAssetId,
- {
- id: fullPageArchiveAssetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE,
- contentType,
- size,
- fileName: null,
- },
- txn,
- );
- });
- if (oldFullPageArchiveAssetId) {
- silentDeleteAsset(userId, oldFullPageArchiveAssetId);
+ if (archiveResult) {
+ const {
+ assetId: fullPageArchiveAssetId,
+ size,
+ contentType,
+ } = archiveResult;
+
+ await db.transaction(async (txn) => {
+ await updateAsset(
+ oldFullPageArchiveAssetId,
+ {
+ id: fullPageArchiveAssetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_FULL_PAGE_ARCHIVE,
+ contentType,
+ size,
+ fileName: null,
+ },
+ txn,
+ );
+ });
+ if (oldFullPageArchiveAssetId) {
+ silentDeleteAsset(userId, oldFullPageArchiveAssetId);
+ }
}
}
};
diff --git a/apps/workers/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index ca591e6f..d25c1948 100644
--- a/apps/workers/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -8,7 +8,6 @@ import { db } from "@karakeep/db";
import { AssetTypes } from "@karakeep/db/schema";
import {
ASSET_TYPES,
- getAssetSize,
newAssetId,
saveAssetFromFile,
silentDeleteAsset,
@@ -20,6 +19,10 @@ import {
ZVideoRequest,
zvideoRequestSchema,
} from "@karakeep/shared/queues";
+import {
+ checkStorageQuota,
+ StorageQuotaError,
+} from "@karakeep/trpc/lib/storageQuota";
import { withTimeout } from "../utils";
import { getBookmarkDetails, updateAsset } from "../workerUtils";
@@ -140,32 +143,51 @@ async function runWorker(job: DequeuedJob<ZVideoRequest>) {
logger.info(
`[VideoCrawler][${jobId}] Finished downloading a file from "${url}" to "${assetPath}"`,
);
- await saveAssetFromFile({
- userId,
- assetId: videoAssetId,
- assetPath,
- metadata: { contentType: ASSET_TYPES.VIDEO_MP4 },
- });
-
- await db.transaction(async (txn) => {
- await updateAsset(
- oldVideoAssetId,
- {
- id: videoAssetId,
- bookmarkId,
- userId,
- assetType: AssetTypes.LINK_VIDEO,
- contentType: ASSET_TYPES.VIDEO_MP4,
- size: await getAssetSize({ userId, assetId: videoAssetId }),
- },
- txn,
- );
- });
- await silentDeleteAsset(userId, oldVideoAssetId);
- logger.info(
- `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`,
- );
+ // Get file size and check quota before saving
+ const stats = await fs.promises.stat(assetPath);
+ const fileSize = stats.size;
+
+ try {
+ const quotaApproved = await checkStorageQuota(db, userId, fileSize);
+
+ await saveAssetFromFile({
+ userId,
+ assetId: videoAssetId,
+ assetPath,
+ metadata: { contentType: ASSET_TYPES.VIDEO_MP4 },
+ quotaApproved,
+ });
+
+ await db.transaction(async (txn) => {
+ await updateAsset(
+ oldVideoAssetId,
+ {
+ id: videoAssetId,
+ bookmarkId,
+ userId,
+ assetType: AssetTypes.LINK_VIDEO,
+ contentType: ASSET_TYPES.VIDEO_MP4,
+ size: fileSize,
+ },
+ txn,
+ );
+ });
+ await silentDeleteAsset(userId, oldVideoAssetId);
+
+ logger.info(
+ `[VideoCrawler][${jobId}] Finished downloading video from "${url}" and adding it to the database`,
+ );
+ } catch (error) {
+ if (error instanceof StorageQuotaError) {
+ logger.warn(
+ `[VideoCrawler][${jobId}] Skipping video storage due to quota exceeded: ${error.message}`,
+ );
+ await deleteLeftOverAssetFile(jobId, videoAssetId);
+ return;
+ }
+ throw error;
+ }
}
/**