diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-02-01 18:16:25 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-02-01 18:16:25 +0000 |
| commit | fd7011aff5dd8ffde0fb10990da238f7baf9a814 (patch) | |
| tree | 99df3086a838ee33c40722d803c05c45a3a22ae3 /apps/workers | |
| parent | 0893446bed6cca753549ee8e3cf090f2fcf11d9d (diff) | |
| download | karakeep-fd7011aff5dd8ffde0fb10990da238f7baf9a814.tar.zst | |
fix: Abort all IO when workers timeout instead of detaching. Fixes #742
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/crawlerWorker.ts | 75 | ||||
| -rw-r--r-- | apps/workers/openaiWorker.ts | 32 | ||||
| -rw-r--r-- | apps/workers/package.json | 2 | ||||
| -rw-r--r-- | apps/workers/videoWorker.ts | 5 |
4 files changed, 92 insertions, 22 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index 6bb4f4ac..91adb185 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -237,12 +237,16 @@ function validateUrl(url: string) { } } -async function browserlessCrawlPage(jobId: string, url: string) { +async function browserlessCrawlPage( + jobId: string, + url: string, + abortSignal: AbortSignal, +) { logger.info( `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, ); const response = await fetch(url, { - signal: AbortSignal.timeout(5000), + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), }); logger.info( `[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`, @@ -258,6 +262,7 @@ async function browserlessCrawlPage(jobId: string, url: string) { async function crawlPage( jobId: string, url: string, + abortSignal: AbortSignal, ): Promise<{ htmlContent: string; screenshot: Buffer | undefined; @@ -271,7 +276,7 @@ async function crawlPage( browser = globalBrowser; } if (!browser) { - return browserlessCrawlPage(jobId, url); + return browserlessCrawlPage(jobId, url, abortSignal); } const context = await browser.createBrowserContext(); @@ -412,10 +417,13 @@ async function downloadAndStoreFile( userId: string, jobId: string, fileType: string, + abortSignal: AbortSignal, ) { try { logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`); - const response = await fetch(url); + const response = await fetch(url, { + signal: abortSignal, + }); if (!response.ok) { throw new Error(`Failed to download ${fileType}: ${response.status}`); } @@ -451,6 +459,7 @@ async function downloadAndStoreImage( url: string, userId: string, jobId: string, + abortSignal: AbortSignal, ) { if (!serverConfig.crawler.downloadBannerImage) { logger.info( @@ -458,7 +467,7 @@ async function downloadAndStoreImage( ); return null; } - return downloadAndStoreFile(url, userId, jobId, "image"); + return downloadAndStoreFile(url, userId, jobId, "image", abortSignal); } async function archiveWebpage( @@ -466,6 +475,7 @@ async function archiveWebpage( url: string, userId: string, jobId: string, + abortSignal: AbortSignal, ) { logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); const assetId = newAssetId(); @@ -473,6 +483,7 @@ async function archiveWebpage( await execa({ input: html, + cancelSignal: abortSignal, })("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]); const contentType = "text/html"; @@ -500,6 +511,7 @@ async function archiveWebpage( async function getContentType( url: string, jobId: string, + abortSignal: AbortSignal, ): Promise<string | null> { try { logger.info( @@ -507,7 +519,7 @@ async function getContentType( ); const response = await fetch(url, { method: "HEAD", - signal: AbortSignal.timeout(5000), + signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), }); const contentType = response.headers.get("content-type"); logger.info( @@ -536,8 +548,15 @@ async function handleAsAssetBookmark( userId: string, jobId: string, bookmarkId: string, + abortSignal: AbortSignal, ) { - const downloaded = await downloadAndStoreFile(url, userId, jobId, assetType); + const downloaded = await downloadAndStoreFile( + url, + userId, + jobId, + assetType, + abortSignal, + ); if (!downloaded) { return; } @@ -586,6 +605,7 @@ async function crawlAndParseUrl( oldFullPageArchiveAssetId: string | undefined, precrawledArchiveAssetId: string | undefined, archiveFullPage: boolean, + abortSignal: AbortSignal, ) { let result: { htmlContent: string; @@ -609,8 +629,9 @@ async function crawlAndParseUrl( url, }; } else { - result = await crawlPage(jobId, url); + result = await crawlPage(jobId, url, abortSignal); } + abortSignal.throwIfAborted(); const { htmlContent, screenshot, statusCode, url: browserUrl } = result; @@ -619,9 +640,15 @@ async function crawlAndParseUrl( extractReadableContent(htmlContent, browserUrl, jobId), storeScreenshot(screenshot, userId, jobId), ]); + abortSignal.throwIfAborted(); let imageAssetInfo: DBAssetType | null = null; if (meta.image) { - const downloaded = await downloadAndStoreImage(meta.image, userId, jobId); + const downloaded = await downloadAndStoreImage( + meta.image, + userId, + jobId, + abortSignal, + ); if (downloaded) { imageAssetInfo = { id: downloaded.assetId, @@ -633,6 +660,7 @@ async function crawlAndParseUrl( }; } } + abortSignal.throwIfAborted(); // TODO(important): Restrict the size of content to store await db.transaction(async (txn) => { @@ -682,7 +710,13 @@ async function crawlAndParseUrl( assetId: fullPageArchiveAssetId, size, contentType, - } = await archiveWebpage(htmlContent, browserUrl, userId, jobId); + } = await archiveWebpage( + htmlContent, + browserUrl, + userId, + jobId, + abortSignal, + ); await db.transaction(async (txn) => { await updateAsset( @@ -732,19 +766,33 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { ); validateUrl(url); - const contentType = await getContentType(url, jobId); + const contentType = await getContentType(url, jobId, job.abortSignal); // Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF; if (isPdf) { - await handleAsAssetBookmark(url, "pdf", userId, jobId, bookmarkId); + await handleAsAssetBookmark( + url, + "pdf", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); } else if ( contentType && IMAGE_ASSET_TYPES.has(contentType) && SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType) ) { - await handleAsAssetBookmark(url, "image", userId, jobId, bookmarkId); + await handleAsAssetBookmark( + url, + "image", + userId, + jobId, + bookmarkId, + job.abortSignal, + ); } else { const archivalLogic = await crawlAndParseUrl( url, @@ -756,6 +804,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { oldFullPageArchiveAssetId, precrawledArchiveAssetId, archiveFullPage, + job.abortSignal, ); // Enqueue openai job (if not set, assume it's true for backward compatibility) diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 704a6c04..ec5681c6 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -141,6 +141,7 @@ async function inferTagsFromImage( jobId: string, bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { const { asset, metadata } = await readAsset({ userId: bookmark.userId, @@ -161,7 +162,7 @@ async function inferTagsFromImage( ), metadata.contentType, base64, - { json: true }, + { json: true, abortSignal }, ); } @@ -226,6 +227,7 @@ async function inferTagsFromPDF( _jobId: string, bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { const prompt = buildTextPrompt( serverConfig.inference.inferredTagLang, @@ -233,15 +235,17 @@ async function inferTagsFromPDF( `Content: ${bookmark.asset.content}`, serverConfig.inference.contextLength, ); - return inferenceClient.inferFromText(prompt, { json: true }); + return inferenceClient.inferFromText(prompt, { json: true, abortSignal }); } async function inferTagsFromText( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { return await inferenceClient.inferFromText(await buildPrompt(bookmark), { json: true, + abortSignal, }); } @@ -249,17 +253,28 @@ async function inferTags( jobId: string, bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { let response; if (bookmark.link || bookmark.text) { - response = await inferTagsFromText(bookmark, inferenceClient); + response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); } else if (bookmark.asset) { switch (bookmark.asset.assetType) { case "image": - response = await inferTagsFromImage(jobId, bookmark, inferenceClient); + response = await inferTagsFromImage( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); break; case "pdf": - response = await inferTagsFromPDF(jobId, bookmark, inferenceClient); + response = await inferTagsFromPDF( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); break; default: throw new Error(`[inference][${jobId}] Unsupported bookmark type`); @@ -413,7 +428,12 @@ async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) { `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, ); - const tags = await inferTags(jobId, bookmark, inferenceClient); + const tags = await inferTags( + jobId, + bookmark, + inferenceClient, + job.abortSignal, + ); await connectTags(bookmarkId, tags, bookmark.userId); diff --git a/apps/workers/package.json b/apps/workers/package.json index e11a85d6..5a0c1b86 100644 --- a/apps/workers/package.json +++ b/apps/workers/package.json @@ -17,7 +17,7 @@ "drizzle-orm": "^0.38.3", "execa": "9.3.1", "jsdom": "^24.0.0", - "liteque": "^0.3.0", + "liteque": "^0.3.2", "metascraper": "^5.45.24", "metascraper-amazon": "^5.45.22", "metascraper-description": "^5.45.22", diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts index 10f18454..32f16f97 100644 --- a/apps/workers/videoWorker.ts +++ b/apps/workers/videoWorker.ts @@ -104,7 +104,9 @@ async function runWorker(job: DequeuedJob<ZVideoRequest>) { `[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`,
);
- await execa("yt-dlp", ytDlpArguments);
+ await execa("yt-dlp", ytDlpArguments, {
+ cancelSignal: job.abortSignal,
+ });
const downloadPath = await findAssetFile(videoAssetId);
if (!downloadPath) {
logger.info(
@@ -124,7 +126,6 @@ async function runWorker(job: DequeuedJob<ZVideoRequest>) { );
return;
}
- console.log(JSON.stringify(err));
logger.error(
`[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`,
);
|
