diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-02-01 18:16:25 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-02-01 18:16:25 +0000 |
| commit | fd7011aff5dd8ffde0fb10990da238f7baf9a814 (patch) | |
| tree | 99df3086a838ee33c40722d803c05c45a3a22ae3 /apps/workers/crawlerWorker.ts | |
| parent | 0893446bed6cca753549ee8e3cf090f2fcf11d9d (diff) | |
| download | karakeep-fd7011aff5dd8ffde0fb10990da238f7baf9a814.tar.zst | |
fix: Abort all IO when workers timeout instead of detaching. Fixes #742
Diffstat (limited to 'apps/workers/crawlerWorker.ts')
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 75 |
1 files changed, 62 insertions, 13 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index 6bb4f4ac..91adb185 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -237,12 +237,16 @@ function validateUrl(url: string) { } } -async function browserlessCrawlPage(jobId: string, url: string) { +async function browserlessCrawlPage( + jobId: string, + url: string, + abortSignal: AbortSignal, +) { logger.info( `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, ); const response = await fetch(url, { - signal: AbortSignal.timeout(5000), + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), }); logger.info( `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, @@ -258,6 +262,7 @@ async function browserlessCrawlPage(jobId: string, url: string) { async function crawlPage( jobId: string, url: string, + abortSignal: AbortSignal, ): Promise<{ htmlContent: string; screenshot: Buffer | undefined; @@ -271,7 +276,7 @@ async function crawlPage( browser = globalBrowser; } if (!browser) { - return browserlessCrawlPage(jobId, url); + return browserlessCrawlPage(jobId, url, abortSignal); } const context = await browser.createBrowserContext(); @@ -412,10 +417,13 @@ async function downloadAndStoreFile( userId: string, jobId: string, fileType: string, + abortSignal: AbortSignal, ) { try { logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`); - const response = await fetch(url); + const response = await fetch(url, { + signal: abortSignal, + }); if (!response.ok) { throw new Error(`Failed to download ${fileType}: ${response.status}`); } @@ -451,6 +459,7 @@ async function downloadAndStoreImage( url: string, userId: string, jobId: string, + abortSignal: AbortSignal, ) { if (!serverConfig.crawler.downloadBannerImage) { logger.info( @@ -458,7 +467,7 @@ async function downloadAndStoreImage( ); return null; } - return downloadAndStoreFile(url, userId, jobId, "image"); + return downloadAndStoreFile(url, userId, jobId, "image", abortSignal); } async function archiveWebpage( @@ -466,6 +475,7 @@ async function archiveWebpage( url: string, userId: string, jobId: string, + abortSignal: AbortSignal, ) { logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); const assetId = newAssetId(); @@ -473,6 +483,7 @@ async function archiveWebpage( await execa({ input: html, + cancelSignal: abortSignal, })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); const contentType = "text/html"; @@ -500,6 +511,7 @@ async function archiveWebpage( async function getContentType( url: string, jobId: string, + abortSignal: AbortSignal, ): Promise<string | null> { try { logger.info( @@ -507,7 +519,7 @@ async function getContentType( ); const response = await fetch(url, { method: "HEAD", - signal: AbortSignal.timeout(5000), + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), }); const contentType = response.headers.get("content-type"); logger.info( @@ -536,8 +548,15 @@ async function handleAsAssetBookmark( userId: string, jobId: string, bookmarkId: string, + abortSignal: AbortSignal, ) { - const downloaded = await downloadAndStoreFile(url, userId, jobId, assetType); + const downloaded = await downloadAndStoreFile( + url, + userId, + jobId, + assetType, + abortSignal, + ); if (!downloaded) { return; } @@ -586,6 +605,7 @@ async function crawlAndParseUrl( oldFullPageArchiveAssetId: string | undefined, precrawledArchiveAssetId: string | undefined, archiveFullPage: boolean, + abortSignal: AbortSignal, ) { let result: { htmlContent: string; @@ -609,8 +629,9 @@ async function crawlAndParseUrl( url, }; } else { - result = await crawlPage(jobId, url); + result = await crawlPage(jobId, url, abortSignal); } + abortSignal.throwIfAborted(); const { htmlContent, screenshot, statusCode, url: browserUrl } = result; @@ -619,9 +640,15 @@ async function crawlAndParseUrl( extractReadableContent(htmlContent, browserUrl, jobId), storeScreenshot(screenshot, userId, jobId), ]); + abortSignal.throwIfAborted(); let imageAssetInfo: DBAssetType | null = null; if (meta.image) { - const downloaded = await downloadAndStoreImage(meta.image, userId, jobId); + const downloaded = await downloadAndStoreImage( + meta.image, + userId, + jobId, + abortSignal, + ); if (downloaded) { imageAssetInfo = { id: downloaded.assetId, @@ -633,6 +660,7 @@ async function crawlAndParseUrl( }; } } + abortSignal.throwIfAborted(); // TODO(important): Restrict the size of content to store await db.transaction(async (txn) => { @@ -682,7 +710,13 @@ async function crawlAndParseUrl( assetId: fullPageArchiveAssetId, size, contentType, - } = await archiveWebpage(htmlContent, browserUrl, userId, jobId); + } = await archiveWebpage( + htmlContent, + browserUrl, + userId, + jobId, + abortSignal, + ); await db.transaction(async (txn) => { await updateAsset( @@ -732,19 +766,33 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { ); validateUrl(url); - const contentType = await getContentType(url, jobId); + const contentType = await getContentType(url, jobId, job.abortSignal); // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; if (isPdf) { - await handleAsAssetBookmark(url, "pdf", userId, jobId, bookmarkId); + await handleAsAssetBookmark( + url, + "pdf", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); } else if ( contentType && IMAGE_ASSET_TYPES.has(contentType) && SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType) ) { - await handleAsAssetBookmark(url, "image", userId, jobId, bookmarkId); + await handleAsAssetBookmark( + url, + "image", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); } else { const archivalLogic = await crawlAndParseUrl( url, @@ -756,6 +804,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { oldFullPageArchiveAssetId, precrawledArchiveAssetId, archiveFullPage, + job.abortSignal, ); // Enqueue openai job (if not set, assume it's true for backward compatibility) |
