From 136f126296af65f50da598d084d1485c0e40437a Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 27 Apr 2025 00:02:20 +0100 Subject: feat: Implement generic rule engine (#1318) * Add schema for the new rule engine * Add rule engine backend logic * Implement the worker logic and event firing * Implement the UI changesfor the rule engine * Ensure that when a referenced list or tag are deleted, the corresponding event/action is * Dont show smart lists in rule engine events * Add privacy validations for attached tag and list ids * Move the rules logic into a models --- apps/workers/ruleEngineWorker.ts | 86 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 apps/workers/ruleEngineWorker.ts (limited to 'apps/workers/ruleEngineWorker.ts') diff --git a/apps/workers/ruleEngineWorker.ts b/apps/workers/ruleEngineWorker.ts new file mode 100644 index 00000000..427cc383 --- /dev/null +++ b/apps/workers/ruleEngineWorker.ts @@ -0,0 +1,86 @@ +import { eq } from "drizzle-orm"; +import { DequeuedJob, Runner } from "liteque"; +import { buildImpersonatingAuthedContext } from "trpc"; + +import type { ZRuleEngineRequest } from "@karakeep/shared/queues"; +import { db } from "@karakeep/db"; +import { bookmarks } from "@karakeep/db/schema"; +import logger from "@karakeep/shared/logger"; +import { + RuleEngineQueue, + zRuleEngineRequestSchema, +} from "@karakeep/shared/queues"; +import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine"; + +export class RuleEngineWorker { + static build() { + logger.info("Starting rule engine worker ..."); + const worker = new Runner( + RuleEngineQueue, + { + run: runRuleEngine, + onComplete: (job) => { + const jobId = job.id; + logger.info(`[ruleEngine][${jobId}] Completed successfully`); + return Promise.resolve(); + }, + onError: (job) => { + const jobId = job.id; + logger.error( + `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`, + ); + return Promise.resolve(); + }, + }, + { + concurrency: 1, + pollIntervalMs: 1000, + timeoutSecs: 10, + validator: zRuleEngineRequestSchema, + }, + ); + + return worker; + } +} + +async function getBookmarkUserId(bookmarkId: string) { + return await db.query.bookmarks.findFirst({ + where: eq(bookmarks.id, bookmarkId), + columns: { + userId: true, + }, + }); +} + +async function runRuleEngine(job: DequeuedJob) { + const jobId = job.id; + const { bookmarkId, events } = job.data; + + const bookmark = await getBookmarkUserId(bookmarkId); + if (!bookmark) { + throw new Error( + `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`, + ); + } + const userId = bookmark.userId; + const authedCtx = await buildImpersonatingAuthedContext(userId); + + const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId); + + const results = ( + await Promise.all(events.map((event) => ruleEngine.onEvent(event))) + ).flat(); + + if (results.length == 0) { + return; + } + + const message = results + .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`) + .join("\n"); + + logger.info( + `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`, + ); +} -- cgit v1.2.3-70-g09d2