aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/crawlerWorker.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers/crawlerWorker.ts')
-rw-r--r--apps/workers/crawlerWorker.ts75
1 files changed, 62 insertions, 13 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index 6bb4f4ac..91adb185 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -237,12 +237,16 @@ function validateUrl(url: string) {
}
}
-async function browserlessCrawlPage(jobId: string, url: string) {
+async function browserlessCrawlPage(
+ jobId: string,
+ url: string,
+ abortSignal: AbortSignal,
+) {
logger.info(
`[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`,
);
const response = await fetch(url, {
- signal: AbortSignal.timeout(5000),
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
});
logger.info(
`[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`,
@@ -258,6 +262,7 @@ async function browserlessCrawlPage(jobId: string, url: string) {
async function crawlPage(
jobId: string,
url: string,
+ abortSignal: AbortSignal,
): Promise<{
htmlContent: string;
screenshot: Buffer | undefined;
@@ -271,7 +276,7 @@ async function crawlPage(
browser = globalBrowser;
}
if (!browser) {
- return browserlessCrawlPage(jobId, url);
+ return browserlessCrawlPage(jobId, url, abortSignal);
}
const context = await browser.createBrowserContext();
@@ -412,10 +417,13 @@ async function downloadAndStoreFile(
userId: string,
jobId: string,
fileType: string,
+ abortSignal: AbortSignal,
) {
try {
logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`);
- const response = await fetch(url);
+ const response = await fetch(url, {
+ signal: abortSignal,
+ });
if (!response.ok) {
throw new Error(`Failed to download ${fileType}: ${response.status}`);
}
@@ -451,6 +459,7 @@ async function downloadAndStoreImage(
url: string,
userId: string,
jobId: string,
+ abortSignal: AbortSignal,
) {
if (!serverConfig.crawler.downloadBannerImage) {
logger.info(
@@ -458,7 +467,7 @@ async function downloadAndStoreImage(
);
return null;
}
- return downloadAndStoreFile(url, userId, jobId, "image");
+ return downloadAndStoreFile(url, userId, jobId, "image", abortSignal);
}
async function archiveWebpage(
@@ -466,6 +475,7 @@ async function archiveWebpage(
url: string,
userId: string,
jobId: string,
+ abortSignal: AbortSignal,
) {
logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
const assetId = newAssetId();
@@ -473,6 +483,7 @@ async function archiveWebpage(
await execa({
input: html,
+ cancelSignal: abortSignal,
})("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]);
const contentType = "text/html";
@@ -500,6 +511,7 @@ async function archiveWebpage(
async function getContentType(
url: string,
jobId: string,
+ abortSignal: AbortSignal,
): Promise<string | null> {
try {
logger.info(
@@ -507,7 +519,7 @@ async function getContentType(
);
const response = await fetch(url, {
method: "HEAD",
- signal: AbortSignal.timeout(5000),
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
});
const contentType = response.headers.get("content-type");
logger.info(
@@ -536,8 +548,15 @@ async function handleAsAssetBookmark(
userId: string,
jobId: string,
bookmarkId: string,
+ abortSignal: AbortSignal,
) {
- const downloaded = await downloadAndStoreFile(url, userId, jobId, assetType);
+ const downloaded = await downloadAndStoreFile(
+ url,
+ userId,
+ jobId,
+ assetType,
+ abortSignal,
+ );
if (!downloaded) {
return;
}
@@ -586,6 +605,7 @@ async function crawlAndParseUrl(
oldFullPageArchiveAssetId: string | undefined,
precrawledArchiveAssetId: string | undefined,
archiveFullPage: boolean,
+ abortSignal: AbortSignal,
) {
let result: {
htmlContent: string;
@@ -609,8 +629,9 @@ async function crawlAndParseUrl(
url,
};
} else {
- result = await crawlPage(jobId, url);
+ result = await crawlPage(jobId, url, abortSignal);
}
+ abortSignal.throwIfAborted();
const { htmlContent, screenshot, statusCode, url: browserUrl } = result;
@@ -619,9 +640,15 @@ async function crawlAndParseUrl(
extractReadableContent(htmlContent, browserUrl, jobId),
storeScreenshot(screenshot, userId, jobId),
]);
+ abortSignal.throwIfAborted();
let imageAssetInfo: DBAssetType | null = null;
if (meta.image) {
- const downloaded = await downloadAndStoreImage(meta.image, userId, jobId);
+ const downloaded = await downloadAndStoreImage(
+ meta.image,
+ userId,
+ jobId,
+ abortSignal,
+ );
if (downloaded) {
imageAssetInfo = {
id: downloaded.assetId,
@@ -633,6 +660,7 @@ async function crawlAndParseUrl(
};
}
}
+ abortSignal.throwIfAborted();
// TODO(important): Restrict the size of content to store
await db.transaction(async (txn) => {
@@ -682,7 +710,13 @@ async function crawlAndParseUrl(
assetId: fullPageArchiveAssetId,
size,
contentType,
- } = await archiveWebpage(htmlContent, browserUrl, userId, jobId);
+ } = await archiveWebpage(
+ htmlContent,
+ browserUrl,
+ userId,
+ jobId,
+ abortSignal,
+ );
await db.transaction(async (txn) => {
await updateAsset(
@@ -732,19 +766,33 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
);
validateUrl(url);
- const contentType = await getContentType(url, jobId);
+ const contentType = await getContentType(url, jobId, job.abortSignal);
// Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage
const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF;
if (isPdf) {
- await handleAsAssetBookmark(url, "pdf", userId, jobId, bookmarkId);
+ await handleAsAssetBookmark(
+ url,
+ "pdf",
+ userId,
+ jobId,
+ bookmarkId,
+ job.abortSignal,
+ );
} else if (
contentType &&
IMAGE_ASSET_TYPES.has(contentType) &&
SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType)
) {
- await handleAsAssetBookmark(url, "image", userId, jobId, bookmarkId);
+ await handleAsAssetBookmark(
+ url,
+ "image",
+ userId,
+ jobId,
+ bookmarkId,
+ job.abortSignal,
+ );
} else {
const archivalLogic = await crawlAndParseUrl(
url,
@@ -756,6 +804,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
oldFullPageArchiveAssetId,
precrawledArchiveAssetId,
archiveFullPage,
+ job.abortSignal,
);
// Enqueue openai job (if not set, assume it's true for backward compatibility)