aboutsummaryrefslogtreecommitdiffstats
path: root/packages/trpc/stats.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-07-06 18:07:56 +0000
committerMohamed Bassem <me@mbassem.com>2025-07-06 18:09:05 +0000
commitb60ece578304df21602d39c7022a7a4dbc6437e0 (patch)
treea5e3395b0b3b5c9bb01566bf68aa21334fd13784 /packages/trpc/stats.ts
parentcfa0385b4dcd37f9cc29a15f94a59a4f48dd05fb (diff)
downloadkarakeep-b60ece578304df21602d39c7022a7a4dbc6437e0.tar.zst
feat: Add prometheus monitoring. Fixes #758
Diffstat (limited to 'packages/trpc/stats.ts')
-rw-r--r--packages/trpc/stats.ts142
1 files changed, 142 insertions, 0 deletions
diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts
new file mode 100644
index 00000000..465bddcd
--- /dev/null
+++ b/packages/trpc/stats.ts
@@ -0,0 +1,142 @@
+import { count, sum } from "drizzle-orm";
+import { Counter, Gauge, register, Summary } from "prom-client";
+
+import { db } from "@karakeep/db";
+import { assets, bookmarks, users } from "@karakeep/db/schema";
+import {
+ AssetPreprocessingQueue,
+ FeedQueue,
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ RuleEngineQueue,
+ SearchIndexingQueue,
+ TidyAssetsQueue,
+ VideoWorkerQueue,
+ WebhookQueue,
+} from "@karakeep/shared/queues";
+
+// Queue metrics
+const queuePendingJobsGauge = new Gauge({
+ name: "queue_jobs",
+ help: "Number of jobs in each background queue",
+ labelNames: ["queue_name", "status"],
+ async collect() {
+ const queues = [
+ { name: "link_crawler", queue: LinkCrawlerQueue },
+ { name: "openai", queue: OpenAIQueue },
+ { name: "search_indexing", queue: SearchIndexingQueue },
+ { name: "tidy_assets", queue: TidyAssetsQueue },
+ { name: "video_worker", queue: VideoWorkerQueue },
+ { name: "feed", queue: FeedQueue },
+ { name: "asset_preprocessing", queue: AssetPreprocessingQueue },
+ { name: "webhook", queue: WebhookQueue },
+ { name: "rule_engine", queue: RuleEngineQueue },
+ ];
+
+ const stats = await Promise.all(
+ queues.map(async ({ name, queue }) => {
+ try {
+ return {
+ ...(await queue.stats()),
+ name,
+ };
+ } catch (error) {
+ console.error(`Failed to get stats for queue ${name}:`, error);
+ return { name, pending: 0, pending_retry: 0, failed: 0, running: 0 };
+ }
+ }),
+ );
+
+ stats.forEach(({ name, pending, pending_retry, failed, running }) => {
+ this.set({ queue_name: name, status: "pending" }, pending);
+ this.set({ queue_name: name, status: "pending_retry" }, pending_retry);
+ this.set({ queue_name: name, status: "failed" }, failed);
+ this.set({ queue_name: name, status: "running" }, running);
+ });
+ },
+});
+
+// User metrics
+const totalUsersGauge = new Gauge({
+ name: "total_users",
+ help: "Total number of users in the system",
+ async collect() {
+ try {
+ const result = await db.select({ count: count() }).from(users);
+ this.set(result[0]?.count ?? 0);
+ } catch (error) {
+ console.error("Failed to get user count:", error);
+ this.set(0);
+ }
+ },
+});
+
+// Asset metrics
+const totalAssetSizeGauge = new Gauge({
+ name: "total_asset_size_bytes",
+ help: "Total size of all assets in bytes",
+ async collect() {
+ try {
+ const result = await db
+ .select({ totalSize: sum(assets.size) })
+ .from(assets);
+ this.set(Number(result[0]?.totalSize ?? 0));
+ } catch (error) {
+ console.error("Failed to get total asset size:", error);
+ this.set(0);
+ }
+ },
+});
+
+// Bookmark metrics
+const totalBookmarksGauge = new Gauge({
+ name: "total_bookmarks",
+ help: "Total number of bookmarks in the system",
+ async collect() {
+ try {
+ const result = await db.select({ count: count() }).from(bookmarks);
+ this.set(result[0]?.count ?? 0);
+ } catch (error) {
+ console.error("Failed to get bookmark count:", error);
+ this.set(0);
+ }
+ },
+});
+
+// Api metrics
+const apiRequestsTotalCounter = new Counter({
+ name: "trpc_requests_total",
+ help: "Total number of API requests",
+ labelNames: ["type", "path", "is_error"],
+});
+
+const apiErrorsTotalCounter = new Counter({
+ name: "trpc_errors_total",
+ help: "Total number of API requests",
+ labelNames: ["type", "path", "code"],
+});
+
+const apiRequestDurationSummary = new Summary({
+ name: "trpc_request_duration_seconds",
+ help: "Duration of tRPC requests in seconds",
+ labelNames: ["type", "path"],
+});
+
+// Register all metrics
+register.registerMetric(queuePendingJobsGauge);
+register.registerMetric(totalUsersGauge);
+register.registerMetric(totalAssetSizeGauge);
+register.registerMetric(totalBookmarksGauge);
+register.registerMetric(apiRequestsTotalCounter);
+register.registerMetric(apiErrorsTotalCounter);
+register.registerMetric(apiRequestDurationSummary);
+
+export {
+ queuePendingJobsGauge,
+ totalUsersGauge,
+ totalAssetSizeGauge,
+ totalBookmarksGauge,
+ apiRequestsTotalCounter,
+ apiErrorsTotalCounter,
+ apiRequestDurationSummary,
+};