diff options
Diffstat (limited to 'apps')
| -rw-r--r-- | apps/web/components/admin/BackgroundJobs.tsx | 24 | ||||
| -rw-r--r-- | apps/web/lib/i18n/locales/en/translation.json | 2 | ||||
| -rw-r--r-- | apps/workers/index.ts | 27 | ||||
| -rw-r--r-- | apps/workers/workers/assetPreprocessingWorker.ts (renamed from apps/workers/assetPreprocessingWorker.ts) | 1 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts (renamed from apps/workers/crawlerWorker.ts) | 5 | ||||
| -rw-r--r-- | apps/workers/workers/feedWorker.ts (renamed from apps/workers/feedWorker.ts) | 0 | ||||
| -rw-r--r-- | apps/workers/workers/inference/inferenceWorker.ts | 100 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 123 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts (renamed from apps/workers/openaiWorker.ts) | 104 | ||||
| -rw-r--r-- | apps/workers/workers/ruleEngineWorker.ts (renamed from apps/workers/ruleEngineWorker.ts) | 0 | ||||
| -rw-r--r-- | apps/workers/workers/searchWorker.ts (renamed from apps/workers/searchWorker.ts) | 0 | ||||
| -rw-r--r-- | apps/workers/workers/tidyAssetsWorker.ts (renamed from apps/workers/tidyAssetsWorker.ts) | 0 | ||||
| -rw-r--r-- | apps/workers/workers/videoWorker.ts (renamed from apps/workers/videoWorker.ts) | 4 | ||||
| -rw-r--r-- | apps/workers/workers/webhookWorker.ts (renamed from apps/workers/webhookWorker.ts) | 0 |
14 files changed, 288 insertions, 102 deletions
diff --git a/apps/web/components/admin/BackgroundJobs.tsx b/apps/web/components/admin/BackgroundJobs.tsx index 217e2ad9..ac5885ef 100644 --- a/apps/web/components/admin/BackgroundJobs.tsx +++ b/apps/web/components/admin/BackgroundJobs.tsx @@ -127,7 +127,7 @@ function AdminActions() { variant="destructive" loading={isInferencePending} onClick={() => - reRunInferenceOnAllBookmarks({ taggingStatus: "failure" }) + reRunInferenceOnAllBookmarks({ type: "tag", status: "failure" }) } > {t("admin.actions.regenerate_ai_tags_for_failed_bookmarks_only")} @@ -135,12 +135,32 @@ function AdminActions() { <ActionButton variant="destructive" loading={isInferencePending} - onClick={() => reRunInferenceOnAllBookmarks({ taggingStatus: "all" })} + onClick={() => + reRunInferenceOnAllBookmarks({ type: "tag", status: "all" }) + } > {t("admin.actions.regenerate_ai_tags_for_all_bookmarks")} </ActionButton> <ActionButton variant="destructive" + loading={isInferencePending} + onClick={() => + reRunInferenceOnAllBookmarks({ type: "summarize", status: "failure" }) + } + > + {t("admin.actions.regenerate_ai_summaries_for_failed_bookmarks_only")} + </ActionButton> + <ActionButton + variant="destructive" + loading={isInferencePending} + onClick={() => + reRunInferenceOnAllBookmarks({ type: "summarize", status: "all" }) + } + > + {t("admin.actions.regenerate_ai_summaries_for_all_bookmarks")} + </ActionButton> + <ActionButton + variant="destructive" loading={isReindexPending} onClick={() => reindexBookmarks()} > diff --git a/apps/web/lib/i18n/locales/en/translation.json b/apps/web/lib/i18n/locales/en/translation.json index c26c9523..1eef3ac4 100644 --- a/apps/web/lib/i18n/locales/en/translation.json +++ b/apps/web/lib/i18n/locales/en/translation.json @@ -249,6 +249,8 @@ "without_inference": "Without Inference", "regenerate_ai_tags_for_failed_bookmarks_only": "Regenerate AI Tags for Failed Bookmarks Only", "regenerate_ai_tags_for_all_bookmarks": "Regenerate AI Tags for All Bookmarks", + "regenerate_ai_summaries_for_failed_bookmarks_only": "Regenerate AI Summaries for Failed Bookmarks Only", + "regenerate_ai_summaries_for_all_bookmarks": "Regenerate AI Summaries for All Bookmarks", "reindex_all_bookmarks": "Reindex All Bookmarks", "compact_assets": "Compact Assets", "reprocess_assets_fix_mode": "Reprocess Assets (Fix Mode)" diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 208666c7..1cc1ce49 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -1,20 +1,19 @@ import "dotenv/config"; -import { AssetPreprocessingWorker } from "assetPreprocessingWorker"; -import { FeedRefreshingWorker, FeedWorker } from "feedWorker"; -import { RuleEngineWorker } from "ruleEngineWorker"; -import { TidyAssetsWorker } from "tidyAssetsWorker"; - import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { runQueueDBMigrations } from "@karakeep/shared/queues"; -import { CrawlerWorker } from "./crawlerWorker"; import { shutdownPromise } from "./exit"; -import { OpenAiWorker } from "./openaiWorker"; -import { SearchIndexingWorker } from "./searchWorker"; -import { VideoWorker } from "./videoWorker"; -import { WebhookWorker } from "./webhookWorker"; +import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker"; +import { CrawlerWorker } from "./workers/crawlerWorker"; +import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker"; +import { OpenAiWorker } from "./workers/inference/inferenceWorker"; +import { RuleEngineWorker } from "./workers/ruleEngineWorker"; +import { SearchIndexingWorker } from "./workers/searchWorker"; +import { TidyAssetsWorker } from "./workers/tidyAssetsWorker"; +import { VideoWorker } from "./workers/videoWorker"; +import { WebhookWorker } from "./workers/webhookWorker"; async function main() { logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`); @@ -22,7 +21,7 @@ async function main() { const [ crawler, - openai, + inference, search, tidyAssets, video, @@ -46,7 +45,7 @@ async function main() { await Promise.any([ Promise.all([ crawler.run(), - openai.run(), + inference.run(), search.run(), tidyAssets.run(), video.run(), @@ -58,12 +57,12 @@ async function main() { shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", + "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", ); FeedRefreshingWorker.stop(); crawler.stop(); - openai.stop(); + inference.stop(); search.stop(); tidyAssets.stop(); video.stop(); diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts index a678b706..0c0b7aec 100644 --- a/apps/workers/assetPreprocessingWorker.ts +++ b/apps/workers/workers/assetPreprocessingWorker.ts @@ -330,6 +330,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) { if (!isFixMode || anythingChanged) { await OpenAIQueue.enqueue({ bookmarkId, + type: "tag", }); // Update the search index diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index a40cbe53..b928e145 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -859,6 +859,11 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) { if (job.data.runInference !== false) { await OpenAIQueue.enqueue({ bookmarkId, + type: "tag", + }); + await OpenAIQueue.enqueue({ + bookmarkId, + type: "summarize", }); } diff --git a/apps/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts index 1eaba0c3..1eaba0c3 100644 --- a/apps/workers/feedWorker.ts +++ b/apps/workers/workers/feedWorker.ts diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts new file mode 100644 index 00000000..f7492c8b --- /dev/null +++ b/apps/workers/workers/inference/inferenceWorker.ts @@ -0,0 +1,100 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; + +import type { ZOpenAIRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClientFactory } from "@karakeep/shared/inference"; +import logger from "@karakeep/shared/logger"; +import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues"; + +import { runSummarization } from "./summarize"; +import { runTagging } from "./tagging"; + +async function attemptMarkStatus( + jobData: object | undefined, + status: "success" | "failure", +) { + if (!jobData) { + return; + } + try { + const request = zOpenAIRequestSchema.parse(jobData); + await db + .update(bookmarks) + .set({ + ...(request.type === "summarize" + ? { summarizationStatus: status } + : {}), + ...(request.type === "tag" ? { taggingStatus: status } : {}), + }) + .where(eq(bookmarks.id, request.bookmarkId)); + } catch (e) { + logger.error(`Something went wrong when marking the tagging status: ${e}`); + } +} + +export class OpenAiWorker { + static build() { + logger.info("Starting inference worker ..."); + const worker = new Runner<ZOpenAIRequest>( + OpenAIQueue, + { + run: runOpenAI, + onComplete: async (job) => { + const jobId = job.id; + logger.info(`[inference][${jobId}] Completed successfully`); + await attemptMarkStatus(job.data, "success"); + }, + onError: async (job) => { + const jobId = job.id; + logger.error( + `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, + ); + if (job.numRetriesLeft == 0) { + await attemptMarkStatus(job?.data, "failure"); + } + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: serverConfig.inference.jobTimeoutSec, + }, + ); + + return worker; + } +} + +async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) { + const jobId = job.id; + + const inferenceClient = InferenceClientFactory.build(); + if (!inferenceClient) { + logger.debug( + `[inference][${jobId}] No inference client configured, nothing to do now`, + ); + return; + } + + const request = zOpenAIRequestSchema.safeParse(job.data); + if (!request.success) { + throw new Error( + `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`, + ); + } + + const { bookmarkId } = request.data; + switch (request.data.type) { + case "summarize": + await runSummarization(bookmarkId, job, inferenceClient); + break; + case "tag": + await runTagging(bookmarkId, job, inferenceClient); + break; + default: + throw new Error(`Unknown inference type: ${request.data.type}`); + } +} diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts new file mode 100644 index 00000000..a832fe0a --- /dev/null +++ b/apps/workers/workers/inference/summarize.ts @@ -0,0 +1,123 @@ +import { and, eq } from "drizzle-orm"; +import { DequeuedJob } from "liteque"; + +import { db } from "@karakeep/db"; +import { bookmarks, customPrompts } from "@karakeep/db/schema"; +import serverConfig from "@karakeep/shared/config"; +import { InferenceClient } from "@karakeep/shared/inference"; +import logger from "@karakeep/shared/logger"; +import { buildSummaryPrompt } from "@karakeep/shared/prompts"; +import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues"; +import { BookmarkTypes } from "@karakeep/shared/types/bookmarks"; + +async function fetchBookmarkDetailsForSummary(bookmarkId: string) { + const bookmark = await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { id: true, userId: true, type: true }, + with: { + link: { + columns: { + title: true, + description: true, + content: true, + publisher: true, + author: true, + url: true, + }, + }, + // If assets (like PDFs with extracted text) should be summarized, extend here + }, + }); + + if (!bookmark) { + throw new Error(`Bookmark with id ${bookmarkId} not found`); + } + return bookmark; +} + +export async function runSummarization( + bookmarkId: string, + job: DequeuedJob<ZOpenAIRequest>, + inferenceClient: InferenceClient, +) { + if (!serverConfig.inference.enableAutoSummarization) { + logger.info( + `[inference][${job.id}] Skipping summarization job for bookmark with id "${bookmarkId}" because it's disabled in the config.`, + ); + return; + } + const jobId = job.id; + + logger.info( + `[inference][${jobId}] Starting a summary job for bookmark with id "${bookmarkId}"`, + ); + + const bookmarkData = await fetchBookmarkDetailsForSummary(bookmarkId); + + let textToSummarize = ""; + if (bookmarkData.type === BookmarkTypes.LINK && bookmarkData.link) { + const link = bookmarkData.link; + textToSummarize = ` +Title: ${link.title ?? ""} +Description: ${link.description ?? ""} +Content: ${link.content ?? ""} +Publisher: ${link.publisher ?? ""} +Author: ${link.author ?? ""} +URL: ${link.url ?? ""} +`; + } else { + logger.warn( + `[inference][${jobId}] Bookmark ${bookmarkId} (type: ${bookmarkData.type}) is not a LINK or TEXT type with content, or content is missing. Skipping summary.`, + ); + return; + } + + if (!textToSummarize.trim()) { + logger.info( + `[inference][${jobId}] No content to summarize for bookmark ${bookmarkId}.`, + ); + return; + } + + const prompts = await db.query.customPrompts.findMany({ + where: and( + eq(customPrompts.userId, bookmarkData.userId), + eq(customPrompts.appliesTo, "summary"), + ), + columns: { + text: true, + }, + }); + + const summaryPrompt = buildSummaryPrompt( + serverConfig.inference.inferredTagLang, + prompts.map((p) => p.text), + textToSummarize, + serverConfig.inference.contextLength, + ); + + const summaryResult = await inferenceClient.inferFromText(summaryPrompt, { + schema: null, // Summaries are typically free-form text + abortSignal: job.abortSignal, + }); + + if (!summaryResult.response) { + throw new Error( + `[inference][${jobId}] Failed to summarize bookmark ${bookmarkId}, empty response from inference client.`, + ); + } + + logger.info( + `[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`, + ); + + await db + .update(bookmarks) + .set({ + summary: summaryResult.response, + modifiedAt: new Date(), + }) + .where(eq(bookmarks.id, bookmarkId)); + + await triggerSearchReindex(bookmarkId); +} diff --git a/apps/workers/openaiWorker.ts b/apps/workers/workers/inference/tagging.ts index c8b2770e..35c366c7 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,5 +1,5 @@ import { and, Column, eq, inArray, sql } from "drizzle-orm"; -import { DequeuedJob, Runner } from "liteque"; +import { DequeuedJob } from "liteque"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; @@ -14,15 +14,12 @@ import { } from "@karakeep/db/schema"; import { readAsset } from "@karakeep/shared/assetdb"; import serverConfig from "@karakeep/shared/config"; -import { InferenceClientFactory } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; import { - OpenAIQueue, triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, - zOpenAIRequestSchema, } from "@karakeep/shared/queues"; const openAIResponseSchema = z.object({ @@ -39,60 +36,6 @@ function tagNormalizer(col: Column) { sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`, }; } - -async function attemptMarkTaggingStatus( - jobData: object | undefined, - status: "success" | "failure", -) { - if (!jobData) { - return; - } - try { - const request = zOpenAIRequestSchema.parse(jobData); - await db - .update(bookmarks) - .set({ - taggingStatus: status, - }) - .where(eq(bookmarks.id, request.bookmarkId)); - } catch (e) { - logger.error(`Something went wrong when marking the tagging status: ${e}`); - } -} - -export class OpenAiWorker { - static build() { - logger.info("Starting inference worker ..."); - const worker = new Runner<ZOpenAIRequest>( - OpenAIQueue, - { - run: runOpenAI, - onComplete: async (job) => { - const jobId = job.id; - logger.info(`[inference][${jobId}] Completed successfully`); - await attemptMarkTaggingStatus(job.data, "success"); - }, - onError: async (job) => { - const jobId = job.id; - logger.error( - `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, - ); - if (job.numRetriesLeft == 0) { - await attemptMarkTaggingStatus(job?.data, "failure"); - } - }, - }, - { - concurrency: 1, - pollIntervalMs: 1000, - timeoutSecs: serverConfig.inference.jobTimeoutSec, - }, - ); - - return worker; - } -} - async function buildPrompt( bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, ) { @@ -128,17 +71,6 @@ Content: ${content ?? ""}`, throw new Error("Unknown bookmark type"); } -async function fetchBookmark(linkId: string) { - return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, linkId), - with: { - link: true, - text: true, - asset: true, - }, - }); -} - async function inferTagsFromImage( jobId: string, bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>, @@ -416,25 +348,29 @@ async function connectTags( }); } -async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) { - const jobId = job.id; +async function fetchBookmark(linkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, linkId), + with: { + link: true, + text: true, + asset: true, + }, + }); +} - const inferenceClient = InferenceClientFactory.build(); - if (!inferenceClient) { - logger.debug( - `[inference][${jobId}] No inference client configured, nothing to do now`, +export async function runTagging( + bookmarkId: string, + job: DequeuedJob<ZOpenAIRequest>, + inferenceClient: InferenceClient, +) { + if (!serverConfig.inference.enableAutoTagging) { + logger.info( + `[inference][${job.id}] Skipping tagging job for bookmark with id "${bookmarkId}" because it's disabled in the config.`, ); return; } - - const request = zOpenAIRequestSchema.safeParse(job.data); - if (!request.success) { - throw new Error( - `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`, - ); - } - - const { bookmarkId } = request.data; + const jobId = job.id; const bookmark = await fetchBookmark(bookmarkId); if (!bookmark) { throw new Error( diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts index 427cc383..427cc383 100644 --- a/apps/workers/ruleEngineWorker.ts +++ b/apps/workers/workers/ruleEngineWorker.ts diff --git a/apps/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts index e7b827a9..e7b827a9 100644 --- a/apps/workers/searchWorker.ts +++ b/apps/workers/workers/searchWorker.ts diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts index d4c8abdb..d4c8abdb 100644 --- a/apps/workers/tidyAssetsWorker.ts +++ b/apps/workers/workers/tidyAssetsWorker.ts diff --git a/apps/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts index b8f85ddf..3fa3e49e 100644 --- a/apps/workers/videoWorker.ts +++ b/apps/workers/workers/videoWorker.ts @@ -21,8 +21,8 @@ import { zvideoRequestSchema,
} from "@karakeep/shared/queues";
-import { withTimeout } from "./utils";
-import { getBookmarkDetails, updateAsset } from "./workerUtils";
+import { withTimeout } from "../utils";
+import { getBookmarkDetails, updateAsset } from "../workerUtils";
const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads");
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts index 9d3ed2c1..9d3ed2c1 100644 --- a/apps/workers/webhookWorker.ts +++ b/apps/workers/workers/webhookWorker.ts |
