From fd7011aff5dd8ffde0fb10990da238f7baf9a814 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sat, 1 Feb 2025 18:16:25 +0000 Subject: fix: Abort all IO when workers timeout instead of detaching. Fixes #742 --- apps/workers/openaiWorker.ts | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) (limited to 'apps/workers/openaiWorker.ts') diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 704a6c04..ec5681c6 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -141,6 +141,7 @@ async function inferTagsFromImage( jobId: string, bookmark: NonNullable>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { const { asset, metadata } = await readAsset({ userId: bookmark.userId, @@ -161,7 +162,7 @@ async function inferTagsFromImage( ), metadata.contentType, base64, - { json: true }, + { json: true, abortSignal }, ); } @@ -226,6 +227,7 @@ async function inferTagsFromPDF( _jobId: string, bookmark: NonNullable>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { const prompt = buildTextPrompt( serverConfig.inference.inferredTagLang, @@ -233,15 +235,17 @@ async function inferTagsFromPDF( `Content: ${bookmark.asset.content}`, serverConfig.inference.contextLength, ); - return inferenceClient.inferFromText(prompt, { json: true }); + return inferenceClient.inferFromText(prompt, { json: true, abortSignal }); } async function inferTagsFromText( bookmark: NonNullable>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { return await inferenceClient.inferFromText(await buildPrompt(bookmark), { json: true, + abortSignal, }); } @@ -249,17 +253,28 @@ async function inferTags( jobId: string, bookmark: NonNullable>>, inferenceClient: InferenceClient, + abortSignal: AbortSignal, ) { let response; if (bookmark.link || bookmark.text) { - response = await inferTagsFromText(bookmark, inferenceClient); + response = await inferTagsFromText(bookmark, inferenceClient, abortSignal); } else if (bookmark.asset) { switch (bookmark.asset.assetType) { case "image": - response = await inferTagsFromImage(jobId, bookmark, inferenceClient); + response = await inferTagsFromImage( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); break; case "pdf": - response = await inferTagsFromPDF(jobId, bookmark, inferenceClient); + response = await inferTagsFromPDF( + jobId, + bookmark, + inferenceClient, + abortSignal, + ); break; default: throw new Error(`[inference][${jobId}] Unsupported bookmark type`); @@ -413,7 +428,12 @@ async function runOpenAI(job: DequeuedJob) { `[inference][${jobId}] Starting an inference job for bookmark with id "${bookmark.id}"`, ); - const tags = await inferTags(jobId, bookmark, inferenceClient); + const tags = await inferTags( + jobId, + bookmark, + inferenceClient, + job.abortSignal, + ); await connectTags(bookmarkId, tags, bookmark.userId); -- cgit v1.2.3-70-g09d2