aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-04-27 00:02:20 +0100
committerGitHub <noreply@github.com>2025-04-27 00:02:20 +0100
commit136f126296af65f50da598d084d1485c0e40437a (patch)
tree2725c7932ebbcb9b48b5af98eb9b72329a400260 /apps/workers
parentca47be7fe7be128f459c37614a04902a873fe289 (diff)
downloadkarakeep-136f126296af65f50da598d084d1485c0e40437a.tar.zst
feat: Implement generic rule engine (#1318)
* Add schema for the new rule engine * Add rule engine backend logic * Implement the worker logic and event firing * Implement the UI changesfor the rule engine * Ensure that when a referenced list or tag are deleted, the corresponding event/action is * Dont show smart lists in rule engine events * Add privacy validations for attached tag and list ids * Move the rules logic into a models
Diffstat (limited to 'apps/workers')
-rw-r--r--apps/workers/index.ts7
-rw-r--r--apps/workers/openaiWorker.ts22
-rw-r--r--apps/workers/ruleEngineWorker.ts86
-rw-r--r--apps/workers/trpc.ts21
-rw-r--r--apps/workers/webhookWorker.ts13
5 files changed, 133 insertions, 16 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts
index 207c7f64..208666c7 100644
--- a/apps/workers/index.ts
+++ b/apps/workers/index.ts
@@ -2,6 +2,7 @@ import "dotenv/config";
import { AssetPreprocessingWorker } from "assetPreprocessingWorker";
import { FeedRefreshingWorker, FeedWorker } from "feedWorker";
+import { RuleEngineWorker } from "ruleEngineWorker";
import { TidyAssetsWorker } from "tidyAssetsWorker";
import serverConfig from "@karakeep/shared/config";
@@ -28,6 +29,7 @@ async function main() {
feed,
assetPreprocessing,
webhook,
+ ruleEngine,
] = [
await CrawlerWorker.build(),
OpenAiWorker.build(),
@@ -37,6 +39,7 @@ async function main() {
FeedWorker.build(),
AssetPreprocessingWorker.build(),
WebhookWorker.build(),
+ RuleEngineWorker.build(),
];
FeedRefreshingWorker.start();
@@ -50,11 +53,12 @@ async function main() {
feed.run(),
assetPreprocessing.run(),
webhook.run(),
+ ruleEngine.run(),
]),
shutdownPromise,
]);
logger.info(
- "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...",
+ "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...",
);
FeedRefreshingWorker.stop();
@@ -66,6 +70,7 @@ async function main() {
feed.stop();
assetPreprocessing.stop();
webhook.stop();
+ ruleEngine.stop();
}
main();
diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts
index 7b0ae095..c8b2770e 100644
--- a/apps/workers/openaiWorker.ts
+++ b/apps/workers/openaiWorker.ts
@@ -19,6 +19,7 @@ import logger from "@karakeep/shared/logger";
import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts";
import {
OpenAIQueue,
+ triggerRuleEngineOnEvent,
triggerSearchReindex,
triggerWebhook,
zOpenAIRequestSchema,
@@ -377,19 +378,20 @@ async function connectTags(
}
// Delete old AI tags
- await tx
+ const detachedTags = await tx
.delete(tagsOnBookmarks)
.where(
and(
eq(tagsOnBookmarks.attachedBy, "ai"),
eq(tagsOnBookmarks.bookmarkId, bookmarkId),
),
- );
+ )
+ .returning();
const allTagIds = new Set([...matchedTagIds, ...newTagIds]);
// Attach new ones
- await tx
+ const attachedTags = await tx
.insert(tagsOnBookmarks)
.values(
[...allTagIds].map((tagId) => ({
@@ -398,7 +400,19 @@ async function connectTags(
attachedBy: "ai" as const,
})),
)
- .onConflictDoNothing();
+ .onConflictDoNothing()
+ .returning();
+
+ await triggerRuleEngineOnEvent(bookmarkId, [
+ ...detachedTags.map((t) => ({
+ type: "tagRemoved" as const,
+ tagId: t.tagId,
+ })),
+ ...attachedTags.map((t) => ({
+ type: "tagAdded" as const,
+ tagId: t.tagId,
+ })),
+ ]);
});
}
diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/ruleEngineWorker.ts
new file mode 100644
index 00000000..427cc383
--- /dev/null
+++ b/apps/workers/ruleEngineWorker.ts
@@ -0,0 +1,86 @@
+import { eq } from "drizzle-orm";
+import { DequeuedJob, Runner } from "liteque";
+import { buildImpersonatingAuthedContext } from "trpc";
+
+import type { ZRuleEngineRequest } from "@karakeep/shared/queues";
+import { db } from "@karakeep/db";
+import { bookmarks } from "@karakeep/db/schema";
+import logger from "@karakeep/shared/logger";
+import {
+ RuleEngineQueue,
+ zRuleEngineRequestSchema,
+} from "@karakeep/shared/queues";
+import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine";
+
+export class RuleEngineWorker {
+ static build() {
+ logger.info("Starting rule engine worker ...");
+ const worker = new Runner<ZRuleEngineRequest>(
+ RuleEngineQueue,
+ {
+ run: runRuleEngine,
+ onComplete: (job) => {
+ const jobId = job.id;
+ logger.info(`[ruleEngine][${jobId}] Completed successfully`);
+ return Promise.resolve();
+ },
+ onError: (job) => {
+ const jobId = job.id;
+ logger.error(
+ `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`,
+ );
+ return Promise.resolve();
+ },
+ },
+ {
+ concurrency: 1,
+ pollIntervalMs: 1000,
+ timeoutSecs: 10,
+ validator: zRuleEngineRequestSchema,
+ },
+ );
+
+ return worker;
+ }
+}
+
+async function getBookmarkUserId(bookmarkId: string) {
+ return await db.query.bookmarks.findFirst({
+ where: eq(bookmarks.id, bookmarkId),
+ columns: {
+ userId: true,
+ },
+ });
+}
+
+async function runRuleEngine(job: DequeuedJob<ZRuleEngineRequest>) {
+ const jobId = job.id;
+ const { bookmarkId, events } = job.data;
+
+ const bookmark = await getBookmarkUserId(bookmarkId);
+ if (!bookmark) {
+ throw new Error(
+ `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`,
+ );
+ }
+ const userId = bookmark.userId;
+ const authedCtx = await buildImpersonatingAuthedContext(userId);
+
+ const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId);
+
+ const results = (
+ await Promise.all(events.map((event) => ruleEngine.onEvent(event)))
+ ).flat();
+
+ if (results.length == 0) {
+ return;
+ }
+
+ const message = results
+ .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`)
+ .join("\n");
+
+ logger.info(
+ `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`,
+ );
+}
diff --git a/apps/workers/trpc.ts b/apps/workers/trpc.ts
index 8bae287a..c5f880ad 100644
--- a/apps/workers/trpc.ts
+++ b/apps/workers/trpc.ts
@@ -2,15 +2,15 @@ import { eq } from "drizzle-orm";
import { db } from "@karakeep/db";
import { users } from "@karakeep/db/schema";
-import { createCallerFactory } from "@karakeep/trpc";
+import { AuthedContext, createCallerFactory } from "@karakeep/trpc";
import { appRouter } from "@karakeep/trpc/routers/_app";
/**
* This is only safe to use in the context of a worker.
*/
-export async function buildImpersonatingTRPCClient(userId: string) {
- const createCaller = createCallerFactory(appRouter);
-
+export async function buildImpersonatingAuthedContext(
+ userId: string,
+): Promise<AuthedContext> {
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
});
@@ -18,7 +18,7 @@ export async function buildImpersonatingTRPCClient(userId: string) {
throw new Error("User not found");
}
- return createCaller({
+ return {
user: {
id: user.id,
name: user.name,
@@ -29,5 +29,14 @@ export async function buildImpersonatingTRPCClient(userId: string) {
req: {
ip: null,
},
- });
+ };
+}
+
+/**
+ * This is only safe to use in the context of a worker.
+ */
+export async function buildImpersonatingTRPCClient(userId: string) {
+ const createCaller = createCallerFactory(appRouter);
+
+ return createCaller(await buildImpersonatingAuthedContext(userId));
}
diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts
index fb8227e3..9d3ed2c1 100644
--- a/apps/workers/webhookWorker.ts
+++ b/apps/workers/webhookWorker.ts
@@ -47,14 +47,17 @@ export class WebhookWorker {
}
}
-async function fetchBookmark(linkId: string) {
+async function fetchBookmark(bookmarkId: string) {
return await db.query.bookmarks.findFirst({
- where: eq(bookmarks.id, linkId),
+ where: eq(bookmarks.id, bookmarkId),
with: {
- link: true,
- text: true,
- asset: true,
+ link: {
+ columns: {
+ url: true,
+ },
+ },
user: {
+ columns: {},
with: {
webhooks: true,
},