aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-09-28 15:54:08 +0100
committerGitHub <noreply@github.com>2025-09-28 15:54:08 +0100
commit9eecda184018a0f50cd42b9b791f5c4efc6024fd (patch)
treeb0cc16526fd9af345df37615ad5cdc2528651410
parent8dd84ef58b8da920f3e7718cfb5129a44437e53d (diff)
downloadkarakeep-9eecda184018a0f50cd42b9b791f5c4efc6024fd.tar.zst
fix: Abort dangling processing when crawler is aborted (#1988)
* fix: Abort dangling processing when crawler is aborted * comments * report the size * handle unhandleded rejection * drop promisify
-rw-r--r--apps/workers/workers/crawlerWorker.ts125
1 files changed, 98 insertions, 27 deletions
diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index d8e75aba..65130a01 100644
--- a/apps/workers/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -1,7 +1,10 @@
import * as dns from "dns";
import { promises as fs } from "fs";
+import * as fsSync from "fs";
import * as path from "node:path";
import * as os from "os";
+import { Transform } from "stream";
+import { pipeline } from "stream/promises";
import { PlaywrightBlocker } from "@ghostery/adblocker-playwright";
import { Readability } from "@mozilla/readability";
import { Mutex } from "async-mutex";
@@ -72,6 +75,31 @@ import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
import metascraperReddit from "../metascraper-plugins/metascraper-reddit";
+function abortPromise(signal: AbortSignal): Promise<never> {
+ if (signal.aborted) {
+ const p = Promise.reject(signal.reason ?? new Error("AbortError"));
+ p.catch(() => {
+ /* empty */
+ }); // suppress unhandledRejection if not awaited
+ return p;
+ }
+
+ const p = new Promise<never>((_, reject) => {
+ signal.addEventListener(
+ "abort",
+ () => {
+ reject(signal.reason ?? new Error("AbortError"));
+ },
+ { once: true },
+ );
+ });
+
+ p.catch(() => {
+ /* empty */
+ });
+ return p;
+}
+
/**
* Normalize a Content-Type header by stripping parameters (e.g., charset)
* and lowercasing the media type, so comparisons against supported types work.
@@ -407,6 +435,7 @@ async function crawlPage(
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36",
proxy: getPlaywrightProxyConfig(),
});
+
try {
if (globalCookies.length > 0) {
await context.addCookies(globalCookies);
@@ -425,10 +454,13 @@ async function crawlPage(
// Navigate to the target URL
logger.info(`[Crawler][${jobId}] Navigating to "${url}"`);
- const response = await page.goto(url, {
- timeout: serverConfig.crawler.navigateTimeoutSec * 1000,
- waitUntil: "domcontentloaded",
- });
+ const response = await Promise.race([
+ page.goto(url, {
+ timeout: serverConfig.crawler.navigateTimeoutSec * 1000,
+ waitUntil: "domcontentloaded",
+ }),
+ abortPromise(abortSignal).then(() => null),
+ ]);
logger.info(
`[Crawler][${jobId}] Successfully navigated to "${url}". Waiting for the page to load ...`,
@@ -438,12 +470,18 @@ async function crawlPage(
await Promise.race([
page.waitForLoadState("networkidle", { timeout: 5000 }).catch(() => ({})),
new Promise((resolve) => setTimeout(resolve, 5000)),
+ abortPromise(abortSignal),
]);
+ abortSignal.throwIfAborted();
+
logger.info(`[Crawler][${jobId}] Finished waiting for the page to load.`);
// Extract content from the page
const htmlContent = await page.content();
+
+ abortSignal.throwIfAborted();
+
logger.info(`[Crawler][${jobId}] Successfully fetched the page content.`);
// Take a screenshot if configured
@@ -465,8 +503,10 @@ async function crawlPage(
serverConfig.crawler.screenshotTimeoutSec * 1000,
),
),
+ abortPromise(abortSignal).then(() => Buffer.from("")),
]),
);
+ abortSignal.throwIfAborted();
if (screenshotError) {
logger.warn(
`[Crawler][${jobId}] Failed to capture the screenshot. Reason: ${screenshotError}`,
@@ -580,7 +620,7 @@ async function storeScreenshot(
quotaApproved,
});
logger.info(
- `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId}`,
+ `[Crawler][${jobId}] Stored the screenshot as assetId: ${assetId} (${screenshot.byteLength} bytes)`,
);
return { assetId, contentType, fileName, size: screenshot.byteLength };
}
@@ -592,6 +632,7 @@ async function downloadAndStoreFile(
fileType: string,
abortSignal: AbortSignal,
) {
+ let assetPath: string | undefined;
try {
logger.info(
`[Crawler][${jobId}] Downloading ${fileType} from "${url.length > 100 ? url.slice(0, 100) + "..." : url}"`,
@@ -599,11 +640,9 @@ async function downloadAndStoreFile(
const response = await fetchWithProxy(url, {
signal: abortSignal,
});
- if (!response.ok) {
+ if (!response.ok || response.body == null) {
throw new Error(`Failed to download ${fileType}: ${response.status}`);
}
- const buffer = await response.arrayBuffer();
- const assetId = newAssetId();
const contentType = normalizeContentType(
response.headers.get("content-type"),
@@ -612,9 +651,40 @@ async function downloadAndStoreFile(
throw new Error("No content type in the response");
}
+ const assetId = newAssetId();
+ assetPath = path.join(os.tmpdir(), assetId);
+
+ let bytesRead = 0;
+ const contentLengthEnforcer = new Transform({
+ transform(chunk, _, callback) {
+ bytesRead += chunk.length;
+
+ if (abortSignal.aborted) {
+ callback(new Error("AbortError"));
+ } else if (bytesRead > serverConfig.maxAssetSizeMb * 1024 * 1024) {
+ callback(
+ new Error(
+ `Content length exceeds maximum allowed size: ${serverConfig.maxAssetSizeMb}MB`,
+ ),
+ );
+ } else {
+ callback(null, chunk); // pass data along unchanged
+ }
+ },
+ flush(callback) {
+ callback();
+ },
+ });
+
+ await pipeline(
+ response.body,
+ contentLengthEnforcer,
+ fsSync.createWriteStream(assetPath),
+ );
+
// Check storage quota before saving the asset
const { data: quotaApproved, error: quotaError } = await tryCatch(
- QuotaService.checkStorageQuota(db, userId, buffer.byteLength),
+ QuotaService.checkStorageQuota(db, userId, bytesRead),
);
if (quotaError) {
@@ -624,24 +694,28 @@ async function downloadAndStoreFile(
return null;
}
- await saveAsset({
+ await saveAssetFromFile({
userId,
assetId,
metadata: { contentType },
- asset: Buffer.from(buffer),
+ assetPath,
quotaApproved,
});
logger.info(
- `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId}`,
+ `[Crawler][${jobId}] Downloaded ${fileType} as assetId: ${assetId} (${bytesRead} bytes)`,
);
- return { assetId, userId, contentType, size: buffer.byteLength };
+ return { assetId, userId, contentType, size: bytesRead };
} catch (e) {
logger.error(
`[Crawler][${jobId}] Failed to download and store ${fileType}: ${e}`,
);
return null;
+ } finally {
+ if (assetPath) {
+ await tryCatch(fs.unlink(assetPath));
+ }
}
}
@@ -669,7 +743,7 @@ async function archiveWebpage(
) {
logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
const assetId = newAssetId();
- const assetPath = `/tmp/${assetId}`;
+ const assetPath = path.join(os.tmpdir(), assetId);
let res = await execa({
input: html,
@@ -706,17 +780,7 @@ async function archiveWebpage(
logger.warn(
`[Crawler][${jobId}] Skipping page archive storage due to quota exceeded: ${quotaError.message}`,
);
-
- const { error: unlinkError } = await tryCatch(fs.unlink(assetPath));
- if (unlinkError) {
- logger.warn(
- `[Crawler][${jobId}] Failed to clean up temporary archive file: ${unlinkError}`,
- );
- } else {
- logger.info(
- `[Crawler][${jobId}] Cleaned up temporary archive file: ${assetPath}`,
- );
- }
+ await tryCatch(fs.unlink(assetPath));
return null;
}
@@ -940,12 +1004,18 @@ async function crawlAndParseUrl(
const { htmlContent, screenshot, statusCode, url: browserUrl } = result;
- const [meta, readableContent, screenshotAssetInfo] = await Promise.all([
+ const abortableWork = Promise.all([
extractMetadata(htmlContent, browserUrl, jobId),
extractReadableContent(htmlContent, browserUrl, jobId),
storeScreenshot(screenshot, userId, jobId),
]);
+ await Promise.race([abortableWork, abortPromise(abortSignal)]);
+
+ const [meta, readableContent, screenshotAssetInfo] = await abortableWork;
+
+ abortSignal.throwIfAborted();
+
const htmlContentAssetInfo = await storeHtmlContent(
readableContent?.content,
userId,
@@ -1101,7 +1171,7 @@ async function crawlAndParseUrl(
}
async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
- const jobId = job.id ?? "unknown";
+ const jobId = `${job.id}:${job.runNumber}`;
const request = zCrawlLinkRequestSchema.safeParse(job.data);
if (!request.success) {
@@ -1128,6 +1198,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
validateUrl(url);
const contentType = await getContentType(url, jobId, job.abortSignal);
+ job.abortSignal.throwIfAborted();
// Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage
const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF;