diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-09-28 15:54:08 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-09-28 15:54:08 +0100 |
| commit | 9eecda184018a0f50cd42b9b791f5c4efc6024fd (patch) | |
| tree | b0cc16526fd9af345df37615ad5cdc2528651410 /apps | |
| parent | 8dd84ef58b8da920f3e7718cfb5129a44437e53d (diff) | |
| download | karakeep-9eecda184018a0f50cd42b9b791f5c4efc6024fd.tar.zst | |
fix: Abort dangling processing when crawler is aborted (#1988)
* fix: Abort dangling processing when crawler is aborted
* comments
* report the size
* handle unhandleded rejection
* drop promisify
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 125 |
1 files changed, 98 insertions, 27 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index d8e75aba..65130a01 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -1,7 +1,10 @@ import * as dns from "dns"; import { promises as fs } from "fs"; +import * as fsSync from "fs"; import * as path from "node:path"; import * as os from "os"; +import { Transform } from "stream"; +import { pipeline } from "stream/promises"; import { PlaywrightBlocker } from "@ghostery/adblocker-playwright"; import { Readability } from "@mozilla/readability"; import { Mutex } from "async-mutex"; @@ -72,6 +75,31 @@ import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; import metascraperReddit from "../metascraper-plugins/metascraper-reddit"; +function abortPromise(signal: AbortSignal): Promise<never> { + if (signal.aborted) { + const p = Promise.reject(signal.reason ?? new Error("AbortError")); + p.catch(() => { + /* empty */ + }); // suppress unhandledRejection if not awaited + return p; + } + + const p = new Promise<never>((_, reject) => { + signal.addEventListener( + "abort", + () => { + reject(signal.reason ?? new Error("AbortError")); + }, + { once: true }, + ); + }); + + p.catch(() => { + /* empty */ + }); + return p; +} + /** * Normalize a Content-Type header by stripping parameters (e.g., charset) * and lowercasing the media type, so comparisons against supported types work. @@ -407,6 +435,7 @@ async function crawlPage( "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36", proxy: getPlaywrightProxyConfig(), }); + try { if (globalCookies.length > 0) { await context.addCookies(globalCookies); @@ -425,10 +454,13 @@ async function crawlPage( // Navigate to the target URL logger.info(`[Crawler][${jobId}] Navigating to "${url}"`); - const response = await page.goto(url, { - timeout: serverConfig.crawler.navigateTimeoutSec * 1000, - waitUntil: "domcontentloaded", - }); + const response = await Promise.race([ + page.goto(url, { + timeout: serverConfig.crawler.navigateTimeoutSec * 1000, + waitUntil: "domcontentloaded", + }), + abortPromise(abortSignal).then(() => null), + ]); logger.info( `[Crawler][${jobId}] Successfully navigated to "${url}". Waiting for the page to load ...`, @@ -438,12 +470,18 @@ async function crawlPage( await Promise.race([ page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => ({})), new Promise((resolve) => setTimeout(resolve, 5000)), + abortPromise(abortSignal), ]); + abortSignal.throwIfAborted(); + logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`); // Extract content from the page const htmlContent = await page.content(); + + abortSignal.throwIfAborted(); + logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`); // Take a screenshot if configured @@ -465,8 +503,10 @@ async function crawlPage( serverConfig.crawler.screenshotTimeoutSec * 1000, ), ), + abortPromise(abortSignal).then(() => Buffer.from("")), ]), ); + abortSignal.throwIfAborted(); if (screenshotError) { logger.warn( `[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`, @@ -580,7 +620,7 @@ async function storeScreenshot( quotaApproved, }); logger.info( - `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`, + `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`, ); return { assetId, contentType, fileName, size: screenshot.byteLength }; } @@ -592,6 +632,7 @@ async function downloadAndStoreFile( fileType: string, abortSignal: AbortSignal, ) { + let assetPath: string | undefined; try { logger.info( `[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`, @@ -599,11 +640,9 @@ async function downloadAndStoreFile( const response = await fetchWithProxy(url, { signal: abortSignal, }); - if (!response.ok) { + if (!response.ok || response.body == null) { throw new Error(`Failed to download ${fileType}: ${response.status}`); } - const buffer = await response.arrayBuffer(); - const assetId = newAssetId(); const contentType = normalizeContentType( response.headers.get("content-type"), @@ -612,9 +651,40 @@ async function downloadAndStoreFile( throw new Error("No content type in the response"); } + const assetId = newAssetId(); + assetPath = path.join(os.tmpdir(), assetId); + + let bytesRead = 0; + const contentLengthEnforcer = new Transform({ + transform(chunk, _, callback) { + bytesRead += chunk.length; + + if (abortSignal.aborted) { + callback(new Error("AbortError")); + } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) { + callback( + new Error( + `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`, + ), + ); + } else { + callback(null, chunk); // pass data along unchanged + } + }, + flush(callback) { + callback(); + }, + }); + + await pipeline( + response.body, + contentLengthEnforcer, + fsSync.createWriteStream(assetPath), + ); + // Check storage quota before saving the asset const { data: quotaApproved, error: quotaError } = await tryCatch( - QuotaService.checkStorageQuota(db, userId, buffer.byteLength), + QuotaService.checkStorageQuota(db, userId, bytesRead), ); if (quotaError) { @@ -624,24 +694,28 @@ async function downloadAndStoreFile( return null; } - await saveAsset({ + await saveAssetFromFile({ userId, assetId, metadata: { contentType }, - asset: Buffer.from(buffer), + assetPath, quotaApproved, }); logger.info( - `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId}`, + `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`, ); - return { assetId, userId, contentType, size: buffer.byteLength }; + return { assetId, userId, contentType, size: bytesRead }; } catch (e) { logger.error( `[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`, ); return null; + } finally { + if (assetPath) { + await tryCatch(fs.unlink(assetPath)); + } } } @@ -669,7 +743,7 @@ async function archiveWebpage( ) { logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); const assetId = newAssetId(); - const assetPath = `/tmp/${assetId}`; + const assetPath = path.join(os.tmpdir(), assetId); let res = await execa({ input: html, @@ -706,17 +780,7 @@ async function archiveWebpage( logger.warn( `[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`, ); - - const { error: unlinkError } = await tryCatch(fs.unlink(assetPath)); - if (unlinkError) { - logger.warn( - `[Crawler][${jobId}] Failed to clean up temporary archive file: ${unlinkError}`, - ); - } else { - logger.info( - `[Crawler][${jobId}] Cleaned up temporary archive file: ${assetPath}`, - ); - } + await tryCatch(fs.unlink(assetPath)); return null; } @@ -940,12 +1004,18 @@ async function crawlAndParseUrl( const { htmlContent, screenshot, statusCode, url: browserUrl } = result; - const [meta, readableContent, screenshotAssetInfo] = await Promise.all([ + const abortableWork = Promise.all([ extractMetadata(htmlContent, browserUrl, jobId), extractReadableContent(htmlContent, browserUrl, jobId), storeScreenshot(screenshot, userId, jobId), ]); + await Promise.race([abortableWork, abortPromise(abortSignal)]); + + const [meta, readableContent, screenshotAssetInfo] = await abortableWork; + + abortSignal.throwIfAborted(); + const htmlContentAssetInfo = await storeHtmlContent( readableContent?.content, userId, @@ -1101,7 +1171,7 @@ async function crawlAndParseUrl( } async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { - const jobId = job.id ?? "unknown"; + const jobId = `${job.id}:${job.runNumber}`; const request = zCrawlLinkRequestSchema.safeParse(job.data); if (!request.success) { @@ -1128,6 +1198,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { validateUrl(url); const contentType = await getContentType(url, jobId, job.abortSignal); + job.abortSignal.throwIfAborted(); // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; |
