diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-04-27 00:02:20 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-04-27 00:02:20 +0100 |
| commit | 136f126296af65f50da598d084d1485c0e40437a (patch) | |
| tree | 2725c7932ebbcb9b48b5af98eb9b72329a400260 /apps/workers | |
| parent | ca47be7fe7be128f459c37614a04902a873fe289 (diff) | |
| download | karakeep-136f126296af65f50da598d084d1485c0e40437a.tar.zst | |
feat: Implement generic rule engine (#1318)
* Add schema for the new rule engine
* Add rule engine backend logic
* Implement the worker logic and event firing
* Implement the UI changesfor the rule engine
* Ensure that when a referenced list or tag are deleted, the corresponding event/action is
* Dont show smart lists in rule engine events
* Add privacy validations for attached tag and list ids
* Move the rules logic into a models
Diffstat (limited to 'apps/workers')
| -rw-r--r-- | apps/workers/index.ts | 7 | ||||
| -rw-r--r-- | apps/workers/openaiWorker.ts | 22 | ||||
| -rw-r--r-- | apps/workers/ruleEngineWorker.ts | 86 | ||||
| -rw-r--r-- | apps/workers/trpc.ts | 21 | ||||
| -rw-r--r-- | apps/workers/webhookWorker.ts | 13 |
5 files changed, 133 insertions, 16 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 207c7f64..208666c7 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -2,6 +2,7 @@ import "dotenv/config"; import { AssetPreprocessingWorker } from "assetPreprocessingWorker"; import { FeedRefreshingWorker, FeedWorker } from "feedWorker"; +import { RuleEngineWorker } from "ruleEngineWorker"; import { TidyAssetsWorker } from "tidyAssetsWorker"; import serverConfig from "@karakeep/shared/config"; @@ -28,6 +29,7 @@ async function main() { feed, assetPreprocessing, webhook, + ruleEngine, ] = [ await CrawlerWorker.build(), OpenAiWorker.build(), @@ -37,6 +39,7 @@ async function main() { FeedWorker.build(), AssetPreprocessingWorker.build(), WebhookWorker.build(), + RuleEngineWorker.build(), ]; FeedRefreshingWorker.start(); @@ -50,11 +53,12 @@ async function main() { feed.run(), assetPreprocessing.run(), webhook.run(), + ruleEngine.run(), ]), shutdownPromise, ]); logger.info( - "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook and search workers ...", + "Shutting down crawler, openai, tidyAssets, video, feed, assetPreprocessing, webhook, ruleEngine and search workers ...", ); FeedRefreshingWorker.stop(); @@ -66,6 +70,7 @@ async function main() { feed.stop(); assetPreprocessing.stop(); webhook.stop(); + ruleEngine.stop(); } main(); diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 7b0ae095..c8b2770e 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -19,6 +19,7 @@ import logger from "@karakeep/shared/logger"; import { buildImagePrompt, buildTextPrompt } from "@karakeep/shared/prompts"; import { OpenAIQueue, + triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, zOpenAIRequestSchema, @@ -377,19 +378,20 @@ async function connectTags( } // Delete old AI tags - await tx + const detachedTags = await tx .delete(tagsOnBookmarks) .where( and( eq(tagsOnBookmarks.attachedBy, "ai"), eq(tagsOnBookmarks.bookmarkId, bookmarkId), ), - ); + ) + .returning(); const allTagIds = new Set([...matchedTagIds, ...newTagIds]); // Attach new ones - await tx + const attachedTags = await tx .insert(tagsOnBookmarks) .values( [...allTagIds].map((tagId) => ({ @@ -398,7 +400,19 @@ async function connectTags( attachedBy: "ai" as const, })), ) - .onConflictDoNothing(); + .onConflictDoNothing() + .returning(); + + await triggerRuleEngineOnEvent(bookmarkId, [ + ...detachedTags.map((t) => ({ + type: "tagRemoved" as const, + tagId: t.tagId, + })), + ...attachedTags.map((t) => ({ + type: "tagAdded" as const, + tagId: t.tagId, + })), + ]); }); } diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/ruleEngineWorker.ts new file mode 100644 index 00000000..427cc383 --- /dev/null +++ b/apps/workers/ruleEngineWorker.ts @@ -0,0 +1,86 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import { buildImpersonatingAuthedContext } from "trpc"; + +import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { + RuleEngineQueue, + zRuleEngineRequestSchema, +} from "@karakeep/shared/queues"; +import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; + +export class RuleEngineWorker { + static build() { + logger.info("Starting rule engine worker ..."); + const worker = new Runner<ZRuleEngineRequest>( + RuleEngineQueue, + { + run: runRuleEngine, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[ruleEngine][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 10, + validator: zRuleEngineRequestSchema, + }, + ); + + return worker; + } +} + +async function getBookmarkUserId(bookmarkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { + userId: true, + }, + }); +} + +async function runRuleEngine(job: DequeuedJob<ZRuleEngineRequest>) { + const jobId = job.id; + const { bookmarkId, events } = job.data; + + const bookmark = await getBookmarkUserId(bookmarkId); + if (!bookmark) { + throw new Error( + `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + const userId = bookmark.userId; + const authedCtx = await buildImpersonatingAuthedContext(userId); + + const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); + + const results = ( + await Promise.all(events.map((event) => ruleEngine.onEvent(event))) + ).flat(); + + if (results.length == 0) { + return; + } + + const message = results + .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`) + .join("\n"); + + logger.info( + `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`, + ); +} diff --git a/apps/workers/trpc.ts b/apps/workers/trpc.ts index 8bae287a..c5f880ad 100644 --- a/apps/workers/trpc.ts +++ b/apps/workers/trpc.ts @@ -2,15 +2,15 @@ import { eq } from "drizzle-orm"; import { db } from "@karakeep/db"; import { users } from "@karakeep/db/schema"; -import { createCallerFactory } from "@karakeep/trpc"; +import { AuthedContext, createCallerFactory } from "@karakeep/trpc"; import { appRouter } from "@karakeep/trpc/routers/_app"; /** * This is only safe to use in the context of a worker. */ -export async function buildImpersonatingTRPCClient(userId: string) { - const createCaller = createCallerFactory(appRouter); - +export async function buildImpersonatingAuthedContext( + userId: string, +): Promise<AuthedContext> { const user = await db.query.users.findFirst({ where: eq(users.id, userId), }); @@ -18,7 +18,7 @@ export async function buildImpersonatingTRPCClient(userId: string) { throw new Error("User not found"); } - return createCaller({ + return { user: { id: user.id, name: user.name, @@ -29,5 +29,14 @@ export async function buildImpersonatingTRPCClient(userId: string) { req: { ip: null, }, - }); + }; +} + +/** + * This is only safe to use in the context of a worker. + */ +export async function buildImpersonatingTRPCClient(userId: string) { + const createCaller = createCallerFactory(appRouter); + + return createCaller(await buildImpersonatingAuthedContext(userId)); } diff --git a/apps/workers/webhookWorker.ts b/apps/workers/webhookWorker.ts index fb8227e3..9d3ed2c1 100644 --- a/apps/workers/webhookWorker.ts +++ b/apps/workers/webhookWorker.ts @@ -47,14 +47,17 @@ export class WebhookWorker { } } -async function fetchBookmark(linkId: string) { +async function fetchBookmark(bookmarkId: string) { return await db.query.bookmarks.findFirst({ - where: eq(bookmarks.id, linkId), + where: eq(bookmarks.id, bookmarkId), with: { - link: true, - text: true, - asset: true, + link: { + columns: { + url: true, + }, + }, user: { + columns: {}, with: { webhooks: true, }, |
