aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-02-01 18:16:25 +0000
committerMohamed Bassem <me@mbassem.com>2025-02-01 18:16:25 +0000
commitfd7011aff5dd8ffde0fb10990da238f7baf9a814 (patch)
tree99df3086a838ee33c40722d803c05c45a3a22ae3 /apps/workers
parent0893446bed6cca753549ee8e3cf090f2fcf11d9d (diff)
downloadkarakeep-fd7011aff5dd8ffde0fb10990da238f7baf9a814.tar.zst
fix: Abort all IO when workers timeout instead of detaching. Fixes #742
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/crawlerWorker.ts75
-rw-r--r--apps/workers/openaiWorker.ts32
-rw-r--r--apps/workers/package.json2
-rw-r--r--apps/workers/videoWorker.ts5
4 files changed, 92 insertions, 22 deletions
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts
index 6bb4f4ac..91adb185 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/crawlerWorker.ts
@@ -237,12 +237,16 @@ function validateUrl(url: string) {
}
}
-async function browserlessCrawlPage(jobId: string, url: string) {
+async function browserlessCrawlPage(
+ jobId: string,
+ url: string,
+ abortSignal: AbortSignal,
+) {
logger.info(
`[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`,
);
const response = await fetch(url, {
- signal: AbortSignal.timeout(5000),
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
});
logger.info(
`[Crawler][${jobId}] Successfully fetched the content of "${url}". Status: ${response.status}, Size: ${response.size}`,
@@ -258,6 +262,7 @@ async function browserlessCrawlPage(jobId: string, url: string) {
async function crawlPage(
jobId: string,
url: string,
+ abortSignal: AbortSignal,
): Promise<{
htmlContent: string;
screenshot: Buffer | undefined;
@@ -271,7 +276,7 @@ async function crawlPage(
browser = globalBrowser;
}
if (!browser) {
- return browserlessCrawlPage(jobId, url);
+ return browserlessCrawlPage(jobId, url, abortSignal);
}
const context = await browser.createBrowserContext();
@@ -412,10 +417,13 @@ async function downloadAndStoreFile(
userId: string,
jobId: string,
fileType: string,
+ abortSignal: AbortSignal,
) {
try {
logger.info(`[Crawler][${jobId}] Downloading ${fileType} from "${url}"`);
- const response = await fetch(url);
+ const response = await fetch(url, {
+ signal: abortSignal,
+ });
if (!response.ok) {
throw new Error(`Failed to download ${fileType}: ${response.status}`);
}
@@ -451,6 +459,7 @@ async function downloadAndStoreImage(
url: string,
userId: string,
jobId: string,
+ abortSignal: AbortSignal,
) {
if (!serverConfig.crawler.downloadBannerImage) {
logger.info(
@@ -458,7 +467,7 @@ async function downloadAndStoreImage(
);
return null;
}
- return downloadAndStoreFile(url, userId, jobId, "image");
+ return downloadAndStoreFile(url, userId, jobId, "image", abortSignal);
}
async function archiveWebpage(
@@ -466,6 +475,7 @@ async function archiveWebpage(
url: string,
userId: string,
jobId: string,
+ abortSignal: AbortSignal,
) {
logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`);
const assetId = newAssetId();
@@ -473,6 +483,7 @@ async function archiveWebpage(
await execa({
input: html,
+ cancelSignal: abortSignal,
})("monolith", ["-", "-Ije", "-t", "5", "-b", url, "-o", assetPath]);
const contentType = "text/html";
@@ -500,6 +511,7 @@ async function archiveWebpage(
async function getContentType(
url: string,
jobId: string,
+ abortSignal: AbortSignal,
): Promise<string | null> {
try {
logger.info(
@@ -507,7 +519,7 @@ async function getContentType(
);
const response = await fetch(url, {
method: "HEAD",
- signal: AbortSignal.timeout(5000),
+ signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]),
});
const contentType = response.headers.get("content-type");
logger.info(
@@ -536,8 +548,15 @@ async function handleAsAssetBookmark(
userId: string,
jobId: string,
bookmarkId: string,
+ abortSignal: AbortSignal,
) {
- const downloaded = await downloadAndStoreFile(url, userId, jobId, assetType);
+ const downloaded = await downloadAndStoreFile(
+ url,
+ userId,
+ jobId,
+ assetType,
+ abortSignal,
+ );
if (!downloaded) {
return;
}
@@ -586,6 +605,7 @@ async function crawlAndParseUrl(
oldFullPageArchiveAssetId: string | undefined,
precrawledArchiveAssetId: string | undefined,
archiveFullPage: boolean,
+ abortSignal: AbortSignal,
) {
let result: {
htmlContent: string;
@@ -609,8 +629,9 @@ async function crawlAndParseUrl(
url,
};
} else {
- result = await crawlPage(jobId, url);
+ result = await crawlPage(jobId, url, abortSignal);
}
+ abortSignal.throwIfAborted();
const { htmlContent, screenshot, statusCode, url: browserUrl } = result;
@@ -619,9 +640,15 @@ async function crawlAndParseUrl(
extractReadableContent(htmlContent, browserUrl, jobId),
storeScreenshot(screenshot, userId, jobId),
]);
+ abortSignal.throwIfAborted();
let imageAssetInfo: DBAssetType | null = null;
if (meta.image) {
- const downloaded = await downloadAndStoreImage(meta.image, userId, jobId);
+ const downloaded = await downloadAndStoreImage(
+ meta.image,
+ userId,
+ jobId,
+ abortSignal,
+ );
if (downloaded) {
imageAssetInfo = {
id: downloaded.assetId,
@@ -633,6 +660,7 @@ async function crawlAndParseUrl(
};
}
}
+ abortSignal.throwIfAborted();
// TODO(important): Restrict the size of content to store
await db.transaction(async (txn) => {
@@ -682,7 +710,13 @@ async function crawlAndParseUrl(
assetId: fullPageArchiveAssetId,
size,
contentType,
- } = await archiveWebpage(htmlContent, browserUrl, userId, jobId);
+ } = await archiveWebpage(
+ htmlContent,
+ browserUrl,
+ userId,
+ jobId,
+ abortSignal,
+ );
await db.transaction(async (txn) => {
await updateAsset(
@@ -732,19 +766,33 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
);
validateUrl(url);
- const contentType = await getContentType(url, jobId);
+ const contentType = await getContentType(url, jobId, job.abortSignal);
// Link bookmarks get transformed into asset bookmarks if they point to a supported asset instead of a webpage
const isPdf = contentType === ASSET_TYPES.APPLICATION_PDF;
if (isPdf) {
- await handleAsAssetBookmark(url, "pdf", userId, jobId, bookmarkId);
+ await handleAsAssetBookmark(
+ url,
+ "pdf",
+ userId,
+ jobId,
+ bookmarkId,
+ job.abortSignal,
+ );
} else if (
contentType &&
IMAGE_ASSET_TYPES.has(contentType) &&
SUPPORTED_UPLOAD_ASSET_TYPES.has(contentType)
) {
- await handleAsAssetBookmark(url, "image", userId, jobId, bookmarkId);
+ await handleAsAssetBookmark(
+ url,
+ "image",
+ userId,
+ jobId,
+ bookmarkId,
+ job.abortSignal,
+ );
} else {
const archivalLogic = await crawlAndParseUrl(
url,
@@ -756,6 +804,7 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
oldFullPageArchiveAssetId,
precrawledArchiveAssetId,
archiveFullPage,
+ job.abortSignal,
);
// Enqueue openai job (if not set, assume it's true for backward compatibility)
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 704a6c04..ec5681c6 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -141,6 +141,7 @@ async function inferTagsFromImage(
jobId: string,
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
+ abortSignal: AbortSignal,
) {
const { asset, metadata } = await readAsset({
userId: bookmark.userId,
@@ -161,7 +162,7 @@ async function inferTagsFromImage(
),
metadata.contentType,
base64,
- { json: true },
+ { json: true, abortSignal },
);
}
@@ -226,6 +227,7 @@ async function inferTagsFromPDF(
_jobId: string,
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
+ abortSignal: AbortSignal,
) {
const prompt = buildTextPrompt(
serverConfig.inference.inferredTagLang,
@@ -233,15 +235,17 @@ async function inferTagsFromPDF(
`Content: ${bookmark.asset.content}`,
serverConfig.inference.contextLength,
);
- return inferenceClient.inferFromText(prompt, { json: true });
+ return inferenceClient.inferFromText(prompt, { json: true, abortSignal });
}
async function inferTagsFromText(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
+ abortSignal: AbortSignal,
) {
return await inferenceClient.inferFromText(await buildPrompt(bookmark), {
json: true,
+ abortSignal,
});
}
@@ -249,17 +253,28 @@ async function inferTags(
jobId: string,
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
inferenceClient: InferenceClient,
+ abortSignal: AbortSignal,
) {
let response;
if (bookmark.link || bookmark.text) {
- response = await inferTagsFromText(bookmark, inferenceClient);
+ response = await inferTagsFromText(bookmark, inferenceClient, abortSignal);
} else if (bookmark.asset) {
switch (bookmark.asset.assetType) {
case "image":
- response = await inferTagsFromImage(jobId, bookmark, inferenceClient);
+ response = await inferTagsFromImage(
+ jobId,
+ bookmark,
+ inferenceClient,
+ abortSignal,
+ );
break;
case "pdf":
- response = await inferTagsFromPDF(jobId, bookmark, inferenceClient);
+ response = await inferTagsFromPDF(
+ jobId,
+ bookmark,
+ inferenceClient,
+ abortSignal,
+ );
break;
default:
throw new Error(`[inference][${jobId}] Unsupported bookmark type`);
@@ -413,7 +428,12 @@ async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
`[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`,
);
- const tags = await inferTags(jobId, bookmark, inferenceClient);
+ const tags = await inferTags(
+ jobId,
+ bookmark,
+ inferenceClient,
+ job.abortSignal,
+ );
await connectTags(bookmarkId, tags, bookmark.userId);
diff --git a/apps/workers/package.json b/apps/workers/package.json
index e11a85d6..5a0c1b86 100644
--- a/apps/workers/package.json
+++ b/apps/workers/package.json
@@ -17,7 +17,7 @@
"drizzle-orm": "^0.38.3",
"execa": "9.3.1",
"jsdom": "^24.0.0",
- "liteque": "^0.3.0",
+ "liteque": "^0.3.2",
"metascraper": "^5.45.24",
"metascraper-amazon": "^5.45.22",
"metascraper-description": "^5.45.22",
diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts
index 10f18454..32f16f97 100644
--- a/apps/workers/videoWorker.ts
+++ b/apps/workers/videoWorker.ts
@@ -104,7 +104,9 @@ async function runWorker(job: DequeuedJob<ZVideoRequest>) {
`[VideoCrawler][${jobId}] Attempting to download a file from "${url}" to "${assetPath}" using the following arguments: "${ytDlpArguments}"`,
);
- await execa("yt-dlp", ytDlpArguments);
+ await execa("yt-dlp", ytDlpArguments, {
+ cancelSignal: job.abortSignal,
+ });
const downloadPath = await findAssetFile(videoAssetId);
if (!downloadPath) {
logger.info(
@@ -124,7 +126,6 @@ async function runWorker(job: DequeuedJob<ZVideoRequest>) {
);
return;
}
- console.log(JSON.stringify(err));
logger.error(
`[VideoCrawler][${jobId}] Failed to download a file from "${url}" to "${assetPath}"`,
);