diff options
Diffstat (limited to '')
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 240 | ||||
| -rw-r--r-- | packages/shared/tryCatch.ts | 24 |
2 files changed, 141 insertions, 123 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 140a3701..f22a68d6 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -63,11 +63,9 @@ import { VideoWorkerQueue, zCrawlLinkRequestSchema, } from "@karakeep/shared/queues"; +import { tryCatch } from "@karakeep/shared/tryCatch"; import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; -import { - checkStorageQuota, - StorageQuotaError, -} from "@karakeep/trpc/lib/storageQuota"; +import { checkStorageQuota } from "@karakeep/trpc/lib/storageQuota"; import metascraperReddit from "../metascraper-plugins/metascraper-reddit"; @@ -200,11 +198,10 @@ async function startBrowserInstance() { async function launchBrowser() { globalBrowser = undefined; await browserMutex.runExclusive(async () => { - try { - globalBrowser = await startBrowserInstance(); - } catch (e) { + const globalBrowserResult = await tryCatch(startBrowserInstance()); + if (globalBrowserResult.error) { logger.error( - `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${(e as Error).stack}`, + `[Crawler] Failed to connect to the browser instance, will retry in 5 secs: ${globalBrowserResult.error.stack}`, ); if (isShuttingDown) { logger.info("[Crawler] We're shutting down so won't retry."); @@ -215,6 +212,7 @@ async function launchBrowser() { }, 5000); return; } + globalBrowser = globalBrowserResult.data; globalBrowser?.on("disconnected", () => { if (isShuttingDown) { logger.info( @@ -234,20 +232,20 @@ export class CrawlerWorker { static async build() { chromium.use(StealthPlugin()); if (serverConfig.crawler.enableAdblocker) { - try { - logger.info("[crawler] Loading adblocker ..."); - globalBlocker = await PlaywrightBlocker.fromPrebuiltFull( - fetchWithProxy, - { - path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), - read: fs.readFile, - write: fs.writeFile, - }, - ); - } catch (e) { + logger.info("[crawler] Loading adblocker ..."); + const globalBlockerResult = await tryCatch( + PlaywrightBlocker.fromPrebuiltFull(fetchWithProxy, { + path: path.join(os.tmpdir(), "karakeep_adblocker.bin"), + read: fs.readFile, + write: fs.writeFile, + }), + ); + if (globalBlockerResult.error) { logger.error( - `[crawler] Failed to load adblocker. Will not be blocking ads: ${e}`, + `[crawler] Failed to load adblocker. Will not be blocking ads: ${globalBlockerResult.error}`, ); + } else { + globalBlocker = globalBlockerResult.data; } } if (!serverConfig.crawler.browserConnectOnDemand) { @@ -426,8 +424,8 @@ async function crawlPage( // Take a screenshot if configured let screenshot: Buffer | undefined = undefined; if (serverConfig.crawler.storeScreenshot) { - try { - screenshot = await Promise.race<Buffer>([ + const { data: screenshotData, error: screenshotError } = await tryCatch( + Promise.race<Buffer>([ page.screenshot({ // If you change this, you need to change the asset type in the store function. type: "png", @@ -442,14 +440,17 @@ async function crawlPage( serverConfig.crawler.screenshotTimeoutSec * 1000, ), ), - ]); + ]), + ); + if (screenshotError) { + logger.warn( + `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`, + ); + } else { logger.info( `[Crawler][${jobId}] Finished capturing page content and a screenshot. FullPageScreenshot: ${serverConfig.crawler.fullPageScreenshot}`, ); - } catch (e) { - logger.warn( - `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${e}`, - ); + screenshot = screenshotData; } } @@ -535,33 +536,28 @@ async function storeScreenshot( const fileName = "screenshot.png"; // Check storage quota before saving the screenshot - try { - const quotaApproved = await checkStorageQuota( - db, - userId, - screenshot.byteLength, - ); + const { data: quotaApproved, error: quotaError } = await tryCatch( + checkStorageQuota(db, userId, screenshot.byteLength), + ); - await saveAsset({ - userId, - assetId, - metadata: { contentType, fileName }, - asset: screenshot, - quotaApproved, - }); - logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${quotaError.message}`, ); - return { assetId, contentType, fileName, size: screenshot.byteLength }; - } catch (error) { - if (error instanceof StorageQuotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping screenshot storage due to quota exceeded: ${error.message}`, - ); - return null; - } - throw error; + return null; } + + await saveAsset({ + userId, + assetId, + metadata: { contentType, fileName }, + asset: screenshot, + quotaApproved, + }); + logger.info( + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, + ); + return { assetId, contentType, fileName, size: screenshot.byteLength }; } async function downloadAndStoreFile( @@ -588,12 +584,17 @@ async function downloadAndStoreFile( } // Check storage quota before saving the asset - const quotaApproved = await checkStorageQuota( - db, - userId, - buffer.byteLength, + const { data: quotaApproved, error: quotaError } = await tryCatch( + checkStorageQuota(db, userId, buffer.byteLength), ); + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${quotaError.message}`, + ); + return null; + } + await saveAsset({ userId, assetId, @@ -608,12 +609,6 @@ async function downloadAndStoreFile( return { assetId, userId, contentType, size: buffer.byteLength }; } catch (e) { - if (e instanceof StorageQuotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping ${fileType} storage due to quota exceeded: ${e.message}`, - ); - return null; - } logger.error( `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, ); @@ -658,48 +653,47 @@ async function archiveWebpage( const stats = await fs.stat(assetPath); const fileSize = stats.size; - try { - const quotaApproved = await checkStorageQuota(db, userId, fileSize); - - await saveAssetFromFile({ - userId, - assetId, - assetPath, - metadata: { - contentType, - }, - quotaApproved, - }); + const { data: quotaApproved, error: quotaError } = await tryCatch( + checkStorageQuota(db, userId, fileSize), + ); - logger.info( - `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`, ); - return { - assetId, - contentType, - size: await getAssetSize({ userId, assetId }), - }; - } catch (error) { - if (error instanceof StorageQuotaError) { + const { error: unlinkError } = await tryCatch(fs.unlink(assetPath)); + if (unlinkError) { logger.warn( - `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${error.message}`, + `[Crawler][${jobId}] Failed to clean up temporary archive file: ${unlinkError}`, + ); + } else { + logger.info( + `[Crawler][${jobId}] Cleaned up temporary archive file: ${assetPath}`, ); - // Clean up the temporary file - try { - await fs.unlink(assetPath); - logger.info( - `[Crawler][${jobId}] Cleaned up temporary archive file: ${assetPath}`, - ); - } catch (cleanupError) { - logger.warn( - `[Crawler][${jobId}] Failed to clean up temporary archive file: ${cleanupError}`, - ); - } - return null; } - throw error; + return null; } + + await saveAssetFromFile({ + userId, + assetId, + assetPath, + metadata: { + contentType, + }, + quotaApproved, + }); + + logger.info( + `[Crawler][${jobId}] Done archiving the page as assetId: ${assetId}`, + ); + + return { + assetId, + contentType, + size: await getAssetSize({ userId, assetId }), + }; } async function getContentType( @@ -817,15 +811,20 @@ async function storeHtmlContent( return { result: "store_inline" }; } - try { - const quotaApproved = await checkStorageQuota( - db, - userId, - contentBuffer.byteLength, + const { data: quotaApproved, error: quotaError } = await tryCatch( + checkStorageQuota(db, userId, contentBuffer.byteLength), + ); + if (quotaError) { + logger.warn( + `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${quotaError.message}`, ); - const assetId = newAssetId(); + return { result: "not_stored" }; + } - await saveAsset({ + const assetId = newAssetId(); + + const { error: saveError } = await tryCatch( + saveAsset({ userId, assetId, asset: contentBuffer, @@ -834,29 +833,24 @@ async function storeHtmlContent( fileName: null, }, quotaApproved, - }); - - logger.info( - `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`, - ); - - return { - result: "stored", - assetId, - size: contentSize, - }; - } catch (error) { - if (error instanceof StorageQuotaError) { - logger.warn( - `[Crawler][${jobId}] Skipping HTML content storage due to quota exceeded: ${error.message}`, - ); - return { result: "not_stored" }; - } + }), + ); + if (saveError) { logger.error( - `[Crawler][${jobId}] Failed to store HTML content as asset: ${error}`, + `[Crawler][${jobId}] Failed to store HTML content as asset: ${saveError}`, ); - throw error; + throw saveError; } + + logger.info( + `[Crawler][${jobId}] Stored large HTML content (${contentSize} bytes) as asset: ${assetId}`, + ); + + return { + result: "stored", + assetId, + size: contentSize, + }; } async function crawlAndParseUrl( diff --git a/packages/shared/tryCatch.ts b/packages/shared/tryCatch.ts new file mode 100644 index 00000000..9b50a121 --- /dev/null +++ b/packages/shared/tryCatch.ts @@ -0,0 +1,24 @@ +// Types for the result object with discriminated union +interface Success<T> { + data: T; + error: null; +} + +interface Failure<E> { + data: null; + error: E; +} + +type Result<T, E = Error> = Success<T> | Failure<E>; + +// Main wrapper function +export async function tryCatch<T, E = Error>( + promise: Promise<T>, +): Promise<Result<T, E>> { + try { + const data = await promise; + return { data, error: null }; + } catch (error) { + return { data: null, error: error as E }; + } +} |
