aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts27
-rw-r--r--apps/workers/workers/assetPreprocessingWorker.ts (renamed from apps/workers/assetPreprocessingWorker.ts)1
-rw-r--r--apps/workers/workers/crawlerWorker.ts (renamed from apps/workers/crawlerWorker.ts)5
-rw-r--r--apps/workers/workers/feedWorker.ts (renamed from apps/workers/feedWorker.ts)0
-rw-r--r--apps/workers/workers/inference/inferenceWorker.ts100
-rw-r--r--apps/workers/workers/inference/summarize.ts123
-rw-r--r--apps/workers/workers/inference/tagging.ts (renamed from apps/workers/openaiWorker.ts)104
-rw-r--r--apps/workers/workers/ruleEngineWorker.ts (renamed from apps/workers/ruleEngineWorker.ts)0
-rw-r--r--apps/workers/workers/searchWorker.ts (renamed from apps/workers/searchWorker.ts)0
-rw-r--r--apps/workers/workers/tidyAssetsWorker.ts (renamed from apps/workers/tidyAssetsWorker.ts)0
-rw-r--r--apps/workers/workers/videoWorker.ts (renamed from apps/workers/videoWorker.ts)4
-rw-r--r--apps/workers/workers/webhookWorker.ts (renamed from apps/workers/webhookWorker.ts)0
12 files changed, 264 insertions, 100 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 208666c7..1cc1ce49 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -1,20 +1,19 @@
import "dotenv/config";
-import { AssetPreprocessingWorker } from "assetPreprocessingWorker";
-import { FeedRefreshingWorker, FeedWorker } from "feedWorker";
-import { RuleEngineWorker } from "ruleEngineWorker";
-import { TidyAssetsWorker } from "tidyAssetsWorker";
-
import serverConfig from "@karakeep/shared/config";
import logger from "@karakeep/shared/logger";
import { runQueueDBMigrations } from "@karakeep/shared/queues";
-import { CrawlerWorker } from "./crawlerWorker";
import { shutdownPromise } from "./exit";
-import { OpenAiWorker } from "./openaiWorker";
-import { SearchIndexingWorker } from "./searchWorker";
-import { VideoWorker } from "./videoWorker";
-import { WebhookWorker } from "./webhookWorker";
+import { AssetPreprocessingWorker } from "./workers/assetPreprocessingWorker";
+import { CrawlerWorker } from "./workers/crawlerWorker";
+import { FeedRefreshingWorker, FeedWorker } from "./workers/feedWorker";
+import { OpenAiWorker } from "./workers/inference/inferenceWorker";
+import { RuleEngineWorker } from "./workers/ruleEngineWorker";
+import { SearchIndexingWorker } from "./workers/searchWorker";
+import { TidyAssetsWorker } from "./workers/tidyAssetsWorker";
+import { VideoWorker } from "./workers/videoWorker";
+import { WebhookWorker } from "./workers/webhookWorker";
async function main() {
logger.info(`Workers version: ${serverConfig.serverVersion ?? "not set"}`);
@@ -22,7 +21,7 @@ async function main() {
const [
crawler,
- openai,
+ inference,
search,
tidyAssets,
video,
@@ -46,7 +45,7 @@ async function main() {
await Promise.any([
Promise.all([
crawler.run(),
- openai.run(),
+ inference.run(),
search.run(),
tidyAssets.run(),
video.run(),
@@ -58,12 +57,12 @@ async function main() {
shutdownPromise,
]);
logger.info(
- "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...",
+ "Shutting down crawler, inference, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...",
);
FeedRefreshingWorker.stop();
crawler.stop();
- openai.stop();
+ inference.stop();
search.stop();
tidyAssets.stop();
video.stop();
diff --git a/apps/workers/assetPreprocessingWorker.ts b/apps/workers/workers/assetPreprocessingWorker.ts
index a678b706..0c0b7aec 100644
--- a/apps/workers/assetPreprocessingWorker.ts
+++ b/apps/workers/workers/assetPreprocessingWorker.ts
@@ -330,6 +330,7 @@ async function run(req: DequeuedJob<AssetPreprocessingRequest>) {
if (!isFixMode || anythingChanged) {
await OpenAIQueue.enqueue({
bookmarkId,
+ type: "tag",
});
// Update the search index
diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts
index a40cbe53..b928e145 100644
--- a/apps/workers/crawlerWorker.ts
+++ b/apps/workers/workers/crawlerWorker.ts
@@ -859,6 +859,11 @@ async function runCrawler(job: DequeuedJob<ZCrawlLinkRequest>) {
if (job.data.runInference !== false) {
await OpenAIQueue.enqueue({
bookmarkId,
+ type: "tag",
+ });
+ await OpenAIQueue.enqueue({
+ bookmarkId,
+ type: "summarize",
});
}
diff --git a/apps/workers/feedWorker.ts b/apps/workers/workers/feedWorker.ts
index 1eaba0c3..1eaba0c3 100644
--- a/apps/workers/feedWorker.ts
+++ b/apps/workers/workers/feedWorker.ts
diff --git a/apps/workers/workers/inference/inferenceWorker.ts b/apps/workers/workers/inference/inferenceWorker.ts
new file mode 100644
index 00000000..f7492c8b
--- /dev/null
+++ b/apps/workers/workers/inference/inferenceWorker.ts
@@ -0,0 +1,100 @@
+import { eq } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+
+import type { ZOpenAIRequest } from "@karakeep/shared/queues";
+import { db } from "@karakeep/db";
+import { bookmarks } from "@karakeep/db/schema";
+import serverConfig from "@karakeep/shared/config";
+import { InferenceClientFactory } from "@karakeep/shared/inference";
+import logger from "@karakeep/shared/logger";
+import { OpenAIQueue, zOpenAIRequestSchema } from "@karakeep/shared/queues";
+
+import { runSummarization } from "./summarize";
+import { runTagging } from "./tagging";
+
+async function attemptMarkStatus(
+ jobData: object | undefined,
+ status: "success" | "failure",
+) {
+ if (!jobData) {
+ return;
+ }
+ try {
+ const request = zOpenAIRequestSchema.parse(jobData);
+ await db
+ .update(bookmarks)
+ .set({
+ ...(request.type === "summarize"
+ ? { summarizationStatus: status }
+ : {}),
+ ...(request.type === "tag" ? { taggingStatus: status } : {}),
+ })
+ .where(eq(bookmarks.id, request.bookmarkId));
+ } catch (e) {
+ logger.error(`Something went wrong when marking the tagging status: ${e}`);
+ }
+}
+
+export class OpenAiWorker {
+ static build() {
+ logger.info("Starting inference worker ...");
+ const worker = new Runner<ZOpenAIRequest>(
+ OpenAIQueue,
+ {
+ run: runOpenAI,
+ onComplete: async (job) => {
+ const jobId = job.id;
+ logger.info(`[inference][${jobId}] Completed successfully`);
+ await attemptMarkStatus(job.data, "success");
+ },
+ onError: async (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`,
+ );
+ if (job.numRetriesLeft == 0) {
+ await attemptMarkStatus(job?.data, "failure");
+ }
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: serverConfig.inference.jobTimeoutSec,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
+ const jobId = job.id;
+
+ const inferenceClient = InferenceClientFactory.build();
+ if (!inferenceClient) {
+ logger.debug(
+ `[inference][${jobId}] No inference client configured, nothing to do now`,
+ );
+ return;
+ }
+
+ const request = zOpenAIRequestSchema.safeParse(job.data);
+ if (!request.success) {
+ throw new Error(
+ `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`,
+ );
+ }
+
+ const { bookmarkId } = request.data;
+ switch (request.data.type) {
+ case "summarize":
+ await runSummarization(bookmarkId, job, inferenceClient);
+ break;
+ case "tag":
+ await runTagging(bookmarkId, job, inferenceClient);
+ break;
+ default:
+ throw new Error(`Unknown inference type: ${request.data.type}`);
+ }
+}
diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts
new file mode 100644
index 00000000..a832fe0a
--- /dev/null
+++ b/apps/workers/workers/inference/summarize.ts
@@ -0,0 +1,123 @@
+import { and, eq } from "drizzle-orm";
+import { DequeuedJob } from "liteque";
+
+import { db } from "@karakeep/db";
+import { bookmarks, customPrompts } from "@karakeep/db/schema";
+import serverConfig from "@karakeep/shared/config";
+import { InferenceClient } from "@karakeep/shared/inference";
+import logger from "@karakeep/shared/logger";
+import { buildSummaryPrompt } from "@karakeep/shared/prompts";
+import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared/queues";
+import { BookmarkTypes } from "@karakeep/shared/types/bookmarks";
+
+async function fetchBookmarkDetailsForSummary(bookmarkId: string) {
+ const bookmark = await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, bookmarkId),
+ columns: { id: true, userId: true, type: true },
+ with: {
+ link: {
+ columns: {
+ title: true,
+ description: true,
+ content: true,
+ publisher: true,
+ author: true,
+ url: true,
+ },
+ },
+ // If assets (like PDFs with extracted text) should be summarized, extend here
+ },
+ });
+
+ if (!bookmark) {
+ throw new Error(`Bookmark with id ${bookmarkId} not found`);
+ }
+ return bookmark;
+}
+
+export async function runSummarization(
+ bookmarkId: string,
+ job: DequeuedJob<ZOpenAIRequest>,
+ inferenceClient: InferenceClient,
+) {
+ if (!serverConfig.inference.enableAutoSummarization) {
+ logger.info(
+ `[inference][${job.id}] Skipping summarization job for bookmark with id "${bookmarkId}" because it's disabled in the config.`,
+ );
+ return;
+ }
+ const jobId = job.id;
+
+ logger.info(
+ `[inference][${jobId}] Starting a summary job for bookmark with id "${bookmarkId}"`,
+ );
+
+ const bookmarkData = await fetchBookmarkDetailsForSummary(bookmarkId);
+
+ let textToSummarize = "";
+ if (bookmarkData.type === BookmarkTypes.LINK && bookmarkData.link) {
+ const link = bookmarkData.link;
+ textToSummarize = `
+Title: ${link.title ?? ""}
+Description: ${link.description ?? ""}
+Content: ${link.content ?? ""}
+Publisher: ${link.publisher ?? ""}
+Author: ${link.author ?? ""}
+URL: ${link.url ?? ""}
+`;
+ } else {
+ logger.warn(
+ `[inference][${jobId}] Bookmark ${bookmarkId} (type: ${bookmarkData.type}) is not a LINK or TEXT type with content, or content is missing. Skipping summary.`,
+ );
+ return;
+ }
+
+ if (!textToSummarize.trim()) {
+ logger.info(
+ `[inference][${jobId}] No content to summarize for bookmark ${bookmarkId}.`,
+ );
+ return;
+ }
+
+ const prompts = await db.query.customPrompts.findMany({
+ where: and(
+ eq(customPrompts.userId, bookmarkData.userId),
+ eq(customPrompts.appliesTo, "summary"),
+ ),
+ columns: {
+ text: true,
+ },
+ });
+
+ const summaryPrompt = buildSummaryPrompt(
+ serverConfig.inference.inferredTagLang,
+ prompts.map((p) => p.text),
+ textToSummarize,
+ serverConfig.inference.contextLength,
+ );
+
+ const summaryResult = await inferenceClient.inferFromText(summaryPrompt, {
+ schema: null, // Summaries are typically free-form text
+ abortSignal: job.abortSignal,
+ });
+
+ if (!summaryResult.response) {
+ throw new Error(
+ `[inference][${jobId}] Failed to summarize bookmark ${bookmarkId}, empty response from inference client.`,
+ );
+ }
+
+ logger.info(
+ `[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`,
+ );
+
+ await db
+ .update(bookmarks)
+ .set({
+ summary: summaryResult.response,
+ modifiedAt: new Date(),
+ })
+ .where(eq(bookmarks.id, bookmarkId));
+
+ await triggerSearchReindex(bookmarkId);
+}
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/workers/inference/tagging.ts
index c8b2770e..35c366c7 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/workers/inference/tagging.ts
@@ -1,5 +1,5 @@
import { and, Column, eq, inArray, sql } from "drizzle-orm";
-import { DequeuedJob, Runner } from "liteque";
+import { DequeuedJob } from "liteque";
import { buildImpersonatingTRPCClient } from "trpc";
import { z } from "zod";
@@ -14,15 +14,12 @@ import {
} from "@karakeep/db/schema";
import { readAsset } from "@karakeep/shared/assetdb";
import serverConfig from "@karakeep/shared/config";
-import { InferenceClientFactory } from "@karakeep/shared/inference";
import logger from "@karakeep/shared/logger";
import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
import {
- OpenAIQueue,
triggerRuleEngineOnEvent,
triggerSearchReindex,
triggerWebhook,
- zOpenAIRequestSchema,
} from "@karakeep/shared/queues";
const openAIResponseSchema = z.object({
@@ -39,60 +36,6 @@ function tagNormalizer(col: Column) {
sql: sql`lower(replace(replace(replace(${col}, ' ', ''), '-', ''), '_', ''))`,
};
}
-
-async function attemptMarkTaggingStatus(
- jobData: object | undefined,
- status: "success" | "failure",
-) {
- if (!jobData) {
- return;
- }
- try {
- const request = zOpenAIRequestSchema.parse(jobData);
- await db
- .update(bookmarks)
- .set({
- taggingStatus: status,
- })
- .where(eq(bookmarks.id, request.bookmarkId));
- } catch (e) {
- logger.error(`Something went wrong when marking the tagging status: ${e}`);
- }
-}
-
-export class OpenAiWorker {
- static build() {
- logger.info("Starting inference worker ...");
- const worker = new Runner<ZOpenAIRequest>(
- OpenAIQueue,
- {
- run: runOpenAI,
- onComplete: async (job) => {
- const jobId = job.id;
- logger.info(`[inference][${jobId}] Completed successfully`);
- await attemptMarkTaggingStatus(job.data, "success");
- },
- onError: async (job) => {
- const jobId = job.id;
- logger.error(
- `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`,
- );
- if (job.numRetriesLeft == 0) {
- await attemptMarkTaggingStatus(job?.data, "failure");
- }
- },
- },
- {
- concurrency: 1,
- pollIntervalMs: 1000,
- timeoutSecs: serverConfig.inference.jobTimeoutSec,
- },
- );
-
- return worker;
- }
-}
-
async function buildPrompt(
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
) {
@@ -128,17 +71,6 @@ Content: ${content ?? ""}`,
throw new Error("Unknown bookmark type");
}
-async function fetchBookmark(linkId: string) {
- return await db.query.bookmarks.findFirst({
- where: eq(bookmarks.id, linkId),
- with: {
- link: true,
- text: true,
- asset: true,
- },
- });
-}
-
async function inferTagsFromImage(
jobId: string,
bookmark: NonNullable<Awaited<ReturnType<typeof fetchBookmark>>>,
@@ -416,25 +348,29 @@ async function connectTags(
});
}
-async function runOpenAI(job: DequeuedJob<ZOpenAIRequest>) {
- const jobId = job.id;
+async function fetchBookmark(linkId: string) {
+ return await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, linkId),
+ with: {
+ link: true,
+ text: true,
+ asset: true,
+ },
+ });
+}
- const inferenceClient = InferenceClientFactory.build();
- if (!inferenceClient) {
- logger.debug(
- `[inference][${jobId}] No inference client configured, nothing to do now`,
+export async function runTagging(
+ bookmarkId: string,
+ job: DequeuedJob<ZOpenAIRequest>,
+ inferenceClient: InferenceClient,
+) {
+ if (!serverConfig.inference.enableAutoTagging) {
+ logger.info(
+ `[inference][${job.id}] Skipping tagging job for bookmark with id "${bookmarkId}" because it's disabled in the config.`,
);
return;
}
-
- const request = zOpenAIRequestSchema.safeParse(job.data);
- if (!request.success) {
- throw new Error(
- `[inference][${jobId}] Got malformed job request: ${request.error.toString()}`,
- );
- }
-
- const { bookmarkId } = request.data;
+ const jobId = job.id;
const bookmark = await fetchBookmark(bookmarkId);
if (!bookmark) {
throw new Error(
diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/workers/ruleEngineWorker.ts
index 427cc383..427cc383 100644
--- a/apps/workers/ruleEngineWorker.ts
+++ b/apps/workers/workers/ruleEngineWorker.ts
diff --git a/apps/workers/searchWorker.ts b/apps/workers/workers/searchWorker.ts
index e7b827a9..e7b827a9 100644
--- a/apps/workers/searchWorker.ts
+++ b/apps/workers/workers/searchWorker.ts
diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/workers/tidyAssetsWorker.ts
index d4c8abdb..d4c8abdb 100644
--- a/apps/workers/tidyAssetsWorker.ts
+++ b/apps/workers/workers/tidyAssetsWorker.ts
diff --git a/apps/workers/videoWorker.ts b/apps/workers/workers/videoWorker.ts
index b8f85ddf..3fa3e49e 100644
--- a/apps/workers/videoWorker.ts
+++ b/apps/workers/workers/videoWorker.ts
@@ -21,8 +21,8 @@ import {
zvideoRequestSchema,
} from "@karakeep/shared/queues";
-import { withTimeout } from "./utils";
-import { getBookmarkDetails, updateAsset } from "./workerUtils";
+import { withTimeout } from "../utils";
+import { getBookmarkDetails, updateAsset } from "../workerUtils";
const TMP_FOLDER = path.join(os.tmpdir(), "video_downloads");
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/workers/webhookWorker.ts
index 9d3ed2c1..9d3ed2c1 100644
--- a/apps/workers/webhookWorker.ts
+++ b/apps/workers/workers/webhookWorker.ts