1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
import { eq } from "drizzle-orm";
import { DequeuedJob, Runner } from "liteque";
import { buildImpersonatingAuthedContext } from "trpc";
import type { ZRuleEngineRequest } from "@karakeep/shared/queues";
import { db } from "@karakeep/db";
import { bookmarks } from "@karakeep/db/schema";
import logger from "@karakeep/shared/logger";
import {
RuleEngineQueue,
zRuleEngineRequestSchema,
} from "@karakeep/shared/queues";
import { RuleEngine } from "@karakeep/trpc/lib/ruleEngine";
export class RuleEngineWorker {
static build() {
logger.info("Starting rule engine worker ...");
const worker = new Runner<ZRuleEngineRequest>(
RuleEngineQueue,
{
run: runRuleEngine,
onComplete: (job) => {
const jobId = job.id;
logger.info(`[ruleEngine][${jobId}] Completed successfully`);
return Promise.resolve();
},
onError: (job) => {
const jobId = job.id;
logger.error(
`[ruleEngine][${jobId}] rule engine job failed: ${job.error}\n${job.error.stack}`,
);
return Promise.resolve();
},
},
{
concurrency: 1,
pollIntervalMs: 1000,
timeoutSecs: 10,
validator: zRuleEngineRequestSchema,
},
);
return worker;
}
}
async function getBookmarkUserId(bookmarkId: string) {
return await db.query.bookmarks.findFirst({
where: eq(bookmarks.id, bookmarkId),
columns: {
userId: true,
},
});
}
async function runRuleEngine(job: DequeuedJob<ZRuleEngineRequest>) {
const jobId = job.id;
const { bookmarkId, events } = job.data;
const bookmark = await getBookmarkUserId(bookmarkId);
if (!bookmark) {
throw new Error(
`[ruleEngine][${jobId}] bookmark with id ${bookmarkId} was not found`,
);
}
const userId = bookmark.userId;
const authedCtx = await buildImpersonatingAuthedContext(userId);
const ruleEngine = await RuleEngine.forBookmark(authedCtx, bookmarkId);
const results = (
await Promise.all(events.map((event) => ruleEngine.onEvent(event)))
).flat();
if (results.length == 0) {
return;
}
const message = results
.map((result) => `${result.ruleId}, (${result.type}): ${result.message}`)
.join("\n");
logger.info(
`[ruleEngine][${jobId}] Rule engine job for bookmark ${bookmarkId} completed with results: ${message}`,
);
}
|