diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-07-06 18:07:56 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-07-06 18:09:05 +0000 |
| commit | b60ece578304df21602d39c7022a7a4dbc6437e0 (patch) | |
| tree | a5e3395b0b3b5c9bb01566bf68aa21334fd13784 /packages/trpc | |
| parent | cfa0385b4dcd37f9cc29a15f94a59a4f48dd05fb (diff) | |
| download | karakeep-b60ece578304df21602d39c7022a7a4dbc6437e0.tar.zst | |
feat: Add prometheus monitoring. Fixes #758
Diffstat (limited to 'packages/trpc')
| -rw-r--r-- | packages/trpc/index.ts | 42 | ||||
| -rw-r--r-- | packages/trpc/package.json | 1 | ||||
| -rw-r--r-- | packages/trpc/stats.ts | 142 |
3 files changed, 177 insertions, 8 deletions
diff --git a/packages/trpc/index.ts b/packages/trpc/index.ts index e34e56eb..90f37ae4 100644 --- a/packages/trpc/index.ts +++ b/packages/trpc/index.ts @@ -5,6 +5,12 @@ import { ZodError } from "zod"; import type { db } from "@karakeep/db"; import serverConfig from "@karakeep/shared/config"; +import { + apiErrorsTotalCounter, + apiRequestDurationSummary, + apiRequestsTotalCounter, +} from "./stats"; + interface User { id: string; name?: string | null | undefined; @@ -36,6 +42,11 @@ const t = initTRPC.context<Context>().create({ transformer: superjson, errorFormatter(opts) { const { shape, error } = opts; + apiErrorsTotalCounter.inc({ + type: opts.type, + path: opts.path, + code: error.code, + }); return { ...shape, data: { @@ -51,15 +62,30 @@ const t = initTRPC.context<Context>().create({ export const createCallerFactory = t.createCallerFactory; // Base router and procedure helpers export const router = t.router; -export const procedure = t.procedure.use(function isDemoMode(opts) { - if (serverConfig.demoMode && opts.type == "mutation") { - throw new TRPCError({ - message: "Mutations are not allowed in demo mode", - code: "FORBIDDEN", +export const procedure = t.procedure + .use(function isDemoMode(opts) { + if (serverConfig.demoMode && opts.type == "mutation") { + throw new TRPCError({ + message: "Mutations are not allowed in demo mode", + code: "FORBIDDEN", + }); + } + return opts.next(); + }) + .use(async (opts) => { + const end = apiRequestDurationSummary.startTimer({ + path: opts.path, + type: opts.type, }); - } - return opts.next(); -}); + const res = await opts.next(); + apiRequestsTotalCounter.inc({ + type: opts.type, + path: opts.path, + is_error: res.ok ? 0 : 1, + }); + end(); + return res; + }); export const publicProcedure = procedure; export const authedProcedure = procedure.use(function isAuthed(opts) { diff --git a/packages/trpc/package.json b/packages/trpc/package.json index b0280d6d..f4a9d122 100644 --- a/packages/trpc/package.json +++ b/packages/trpc/package.json @@ -19,6 +19,7 @@ "bcryptjs": "^2.4.3", "deep-equal": "^2.2.3", "drizzle-orm": "^0.38.3", + "prom-client": "^15.1.3", "superjson": "^2.2.1", "tiny-invariant": "^1.3.3", "zod": "^3.24.2" diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts new file mode 100644 index 00000000..465bddcd --- /dev/null +++ b/packages/trpc/stats.ts @@ -0,0 +1,142 @@ +import { count, sum } from "drizzle-orm"; +import { Counter, Gauge, register, Summary } from "prom-client"; + +import { db } from "@karakeep/db"; +import { assets, bookmarks, users } from "@karakeep/db/schema"; +import { + AssetPreprocessingQueue, + FeedQueue, + LinkCrawlerQueue, + OpenAIQueue, + RuleEngineQueue, + SearchIndexingQueue, + TidyAssetsQueue, + VideoWorkerQueue, + WebhookQueue, +} from "@karakeep/shared/queues"; + +// Queue metrics +const queuePendingJobsGauge = new Gauge({ + name: "queue_jobs", + help: "Number of jobs in each background queue", + labelNames: ["queue_name", "status"], + async collect() { + const queues = [ + { name: "link_crawler", queue: LinkCrawlerQueue }, + { name: "openai", queue: OpenAIQueue }, + { name: "search_indexing", queue: SearchIndexingQueue }, + { name: "tidy_assets", queue: TidyAssetsQueue }, + { name: "video_worker", queue: VideoWorkerQueue }, + { name: "feed", queue: FeedQueue }, + { name: "asset_preprocessing", queue: AssetPreprocessingQueue }, + { name: "webhook", queue: WebhookQueue }, + { name: "rule_engine", queue: RuleEngineQueue }, + ]; + + const stats = await Promise.all( + queues.map(async ({ name, queue }) => { + try { + return { + ...(await queue.stats()), + name, + }; + } catch (error) { + console.error(`Failed to get stats for queue ${name}:`, error); + return { name, pending: 0, pending_retry: 0, failed: 0, running: 0 }; + } + }), + ); + + stats.forEach(({ name, pending, pending_retry, failed, running }) => { + this.set({ queue_name: name, status: "pending" }, pending); + this.set({ queue_name: name, status: "pending_retry" }, pending_retry); + this.set({ queue_name: name, status: "failed" }, failed); + this.set({ queue_name: name, status: "running" }, running); + }); + }, +}); + +// User metrics +const totalUsersGauge = new Gauge({ + name: "total_users", + help: "Total number of users in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(users); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get user count:", error); + this.set(0); + } + }, +}); + +// Asset metrics +const totalAssetSizeGauge = new Gauge({ + name: "total_asset_size_bytes", + help: "Total size of all assets in bytes", + async collect() { + try { + const result = await db + .select({ totalSize: sum(assets.size) }) + .from(assets); + this.set(Number(result[0]?.totalSize ?? 0)); + } catch (error) { + console.error("Failed to get total asset size:", error); + this.set(0); + } + }, +}); + +// Bookmark metrics +const totalBookmarksGauge = new Gauge({ + name: "total_bookmarks", + help: "Total number of bookmarks in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(bookmarks); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get bookmark count:", error); + this.set(0); + } + }, +}); + +// Api metrics +const apiRequestsTotalCounter = new Counter({ + name: "trpc_requests_total", + help: "Total number of API requests", + labelNames: ["type", "path", "is_error"], +}); + +const apiErrorsTotalCounter = new Counter({ + name: "trpc_errors_total", + help: "Total number of API requests", + labelNames: ["type", "path", "code"], +}); + +const apiRequestDurationSummary = new Summary({ + name: "trpc_request_duration_seconds", + help: "Duration of tRPC requests in seconds", + labelNames: ["type", "path"], +}); + +// Register all metrics +register.registerMetric(queuePendingJobsGauge); +register.registerMetric(totalUsersGauge); +register.registerMetric(totalAssetSizeGauge); +register.registerMetric(totalBookmarksGauge); +register.registerMetric(apiRequestsTotalCounter); +register.registerMetric(apiErrorsTotalCounter); +register.registerMetric(apiRequestDurationSummary); + +export { + queuePendingJobsGauge, + totalUsersGauge, + totalAssetSizeGauge, + totalBookmarksGauge, + apiRequestsTotalCounter, + apiErrorsTotalCounter, + apiRequestDurationSummary, +}; |
