From b60ece578304df21602d39c7022a7a4dbc6437e0 Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 6 Jul 2025 18:07:56 +0000 Subject: feat: Add prometheus monitoring. Fixes #758 --- packages/trpc/stats.ts | 142 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 packages/trpc/stats.ts (limited to 'packages/trpc/stats.ts') diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts new file mode 100644 index 00000000..465bddcd --- /dev/null +++ b/packages/trpc/stats.ts @@ -0,0 +1,142 @@ +import { count, sum } from "drizzle-orm"; +import { Counter, Gauge, register, Summary } from "prom-client"; + +import { db } from "@karakeep/db"; +import { assets, bookmarks, users } from "@karakeep/db/schema"; +import { + AssetPreprocessingQueue, + FeedQueue, + LinkCrawlerQueue, + OpenAIQueue, + RuleEngineQueue, + SearchIndexingQueue, + TidyAssetsQueue, + VideoWorkerQueue, + WebhookQueue, +} from "@karakeep/shared/queues"; + +// Queue metrics +const queuePendingJobsGauge = new Gauge({ + name: "queue_jobs", + help: "Number of jobs in each background queue", + labelNames: ["queue_name", "status"], + async collect() { + const queues = [ + { name: "link_crawler", queue: LinkCrawlerQueue }, + { name: "openai", queue: OpenAIQueue }, + { name: "search_indexing", queue: SearchIndexingQueue }, + { name: "tidy_assets", queue: TidyAssetsQueue }, + { name: "video_worker", queue: VideoWorkerQueue }, + { name: "feed", queue: FeedQueue }, + { name: "asset_preprocessing", queue: AssetPreprocessingQueue }, + { name: "webhook", queue: WebhookQueue }, + { name: "rule_engine", queue: RuleEngineQueue }, + ]; + + const stats = await Promise.all( + queues.map(async ({ name, queue }) => { + try { + return { + ...(await queue.stats()), + name, + }; + } catch (error) { + console.error(`Failed to get stats for queue ${name}:`, error); + return { name, pending: 0, pending_retry: 0, failed: 0, running: 0 }; + } + }), + ); + + stats.forEach(({ name, pending, pending_retry, failed, running }) => { + this.set({ queue_name: name, status: "pending" }, pending); + this.set({ queue_name: name, status: "pending_retry" }, pending_retry); + this.set({ queue_name: name, status: "failed" }, failed); + this.set({ queue_name: name, status: "running" }, running); + }); + }, +}); + +// User metrics +const totalUsersGauge = new Gauge({ + name: "total_users", + help: "Total number of users in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(users); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get user count:", error); + this.set(0); + } + }, +}); + +// Asset metrics +const totalAssetSizeGauge = new Gauge({ + name: "total_asset_size_bytes", + help: "Total size of all assets in bytes", + async collect() { + try { + const result = await db + .select({ totalSize: sum(assets.size) }) + .from(assets); + this.set(Number(result[0]?.totalSize ?? 0)); + } catch (error) { + console.error("Failed to get total asset size:", error); + this.set(0); + } + }, +}); + +// Bookmark metrics +const totalBookmarksGauge = new Gauge({ + name: "total_bookmarks", + help: "Total number of bookmarks in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(bookmarks); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get bookmark count:", error); + this.set(0); + } + }, +}); + +// Api metrics +const apiRequestsTotalCounter = new Counter({ + name: "trpc_requests_total", + help: "Total number of API requests", + labelNames: ["type", "path", "is_error"], +}); + +const apiErrorsTotalCounter = new Counter({ + name: "trpc_errors_total", + help: "Total number of API requests", + labelNames: ["type", "path", "code"], +}); + +const apiRequestDurationSummary = new Summary({ + name: "trpc_request_duration_seconds", + help: "Duration of tRPC requests in seconds", + labelNames: ["type", "path"], +}); + +// Register all metrics +register.registerMetric(queuePendingJobsGauge); +register.registerMetric(totalUsersGauge); +register.registerMetric(totalAssetSizeGauge); +register.registerMetric(totalBookmarksGauge); +register.registerMetric(apiRequestsTotalCounter); +register.registerMetric(apiErrorsTotalCounter); +register.registerMetric(apiRequestDurationSummary); + +export { + queuePendingJobsGauge, + totalUsersGauge, + totalAssetSizeGauge, + totalBookmarksGauge, + apiRequestsTotalCounter, + apiErrorsTotalCounter, + apiRequestDurationSummary, +}; -- cgit v1.2.3-70-g09d2