aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/ruleEngineWorker.ts
blob: 427cc383f85842060e4563a2e8e757e42240d831 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
import { buildImpersonatingAuthedContext } from "trpc";

import type { ZRuleEngineRequest } from "@karakeep/shared/queues";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
import logger from "@karakeep/shared/logger";
import {
  RuleEngineQueue,
  zRuleEngineRequestSchema,
} from "@karakeep/shared/queues";
import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine";

export class RuleEngineWorker {
  static build() {
    logger.info("Starting rule engine worker ...");
    const worker = new Runner<ZRuleEngineRequest>(
      RuleEngineQueue,
      {
        run: runRuleEngine,
        onComplete: (job) => {
          const jobId = job.id;
          logger.info(`[ruleEngine][${jobId}] Completed successfully`);
          return Promise.resolve();
        },
        onError: (job) => {
          const jobId = job.id;
          logger.error(
            `[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`,
          );
          return Promise.resolve();
        },
      },
      {
        concurrency: 1,
        pollIntervalMs: 1000,
        timeoutSecs: 10,
        validator: zRuleEngineRequestSchema,
      },
    );

    return worker;
  }
}

async function getBookmarkUserId(bookmarkId: string) {
  return await db.query.bookmarks.findFirst({
    where: eq(bookmarks.id, bookmarkId),
    columns: {
      userId: true,
    },
  });
}

async function runRuleEngine(job: DequeuedJob<ZRuleEngineRequest>) {
  const jobId = job.id;
  const { bookmarkId, events } = job.data;

  const bookmark = await getBookmarkUserId(bookmarkId);
  if (!bookmark) {
    throw new Error(
      `[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`,
    );
  }
  const userId = bookmark.userId;
  const authedCtx = await buildImpersonatingAuthedContext(userId);

  const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId);

  const results = (
    await Promise.all(events.map((event) => ruleEngine.onEvent(event)))
  ).flat();

  if (results.length == 0) {
    return;
  }

  const message = results
    .map((result) => `${result.ruleId}, (${result.type}): ${result.message}`)
    .join("\n");

  logger.info(
    `[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`,
  );
}