diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-07-06 18:07:56 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2025-07-06 18:09:05 +0000 |
| commit | b60ece578304df21602d39c7022a7a4dbc6437e0 (patch) | |
| tree | a5e3395b0b3b5c9bb01566bf68aa21334fd13784 /packages/trpc/stats.ts | |
| parent | cfa0385b4dcd37f9cc29a15f94a59a4f48dd05fb (diff) | |
| download | karakeep-b60ece578304df21602d39c7022a7a4dbc6437e0.tar.zst | |
feat: Add prometheus monitoring. Fixes #758
Diffstat (limited to 'packages/trpc/stats.ts')
| -rw-r--r-- | packages/trpc/stats.ts | 142 |
1 files changed, 142 insertions, 0 deletions
diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts new file mode 100644 index 00000000..465bddcd --- /dev/null +++ b/packages/trpc/stats.ts @@ -0,0 +1,142 @@ +import { count, sum } from "drizzle-orm"; +import { Counter, Gauge, register, Summary } from "prom-client"; + +import { db } from "@karakeep/db"; +import { assets, bookmarks, users } from "@karakeep/db/schema"; +import { + AssetPreprocessingQueue, + FeedQueue, + LinkCrawlerQueue, + OpenAIQueue, + RuleEngineQueue, + SearchIndexingQueue, + TidyAssetsQueue, + VideoWorkerQueue, + WebhookQueue, +} from "@karakeep/shared/queues"; + +// Queue metrics +const queuePendingJobsGauge = new Gauge({ + name: "queue_jobs", + help: "Number of jobs in each background queue", + labelNames: ["queue_name", "status"], + async collect() { + const queues = [ + { name: "link_crawler", queue: LinkCrawlerQueue }, + { name: "openai", queue: OpenAIQueue }, + { name: "search_indexing", queue: SearchIndexingQueue }, + { name: "tidy_assets", queue: TidyAssetsQueue }, + { name: "video_worker", queue: VideoWorkerQueue }, + { name: "feed", queue: FeedQueue }, + { name: "asset_preprocessing", queue: AssetPreprocessingQueue }, + { name: "webhook", queue: WebhookQueue }, + { name: "rule_engine", queue: RuleEngineQueue }, + ]; + + const stats = await Promise.all( + queues.map(async ({ name, queue }) => { + try { + return { + ...(await queue.stats()), + name, + }; + } catch (error) { + console.error(`Failed to get stats for queue ${name}:`, error); + return { name, pending: 0, pending_retry: 0, failed: 0, running: 0 }; + } + }), + ); + + stats.forEach(({ name, pending, pending_retry, failed, running }) => { + this.set({ queue_name: name, status: "pending" }, pending); + this.set({ queue_name: name, status: "pending_retry" }, pending_retry); + this.set({ queue_name: name, status: "failed" }, failed); + this.set({ queue_name: name, status: "running" }, running); + }); + }, +}); + +// User metrics +const totalUsersGauge = new Gauge({ + name: "total_users", + help: "Total number of users in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(users); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get user count:", error); + this.set(0); + } + }, +}); + +// Asset metrics +const totalAssetSizeGauge = new Gauge({ + name: "total_asset_size_bytes", + help: "Total size of all assets in bytes", + async collect() { + try { + const result = await db + .select({ totalSize: sum(assets.size) }) + .from(assets); + this.set(Number(result[0]?.totalSize ?? 0)); + } catch (error) { + console.error("Failed to get total asset size:", error); + this.set(0); + } + }, +}); + +// Bookmark metrics +const totalBookmarksGauge = new Gauge({ + name: "total_bookmarks", + help: "Total number of bookmarks in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(bookmarks); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get bookmark count:", error); + this.set(0); + } + }, +}); + +// Api metrics +const apiRequestsTotalCounter = new Counter({ + name: "trpc_requests_total", + help: "Total number of API requests", + labelNames: ["type", "path", "is_error"], +}); + +const apiErrorsTotalCounter = new Counter({ + name: "trpc_errors_total", + help: "Total number of API requests", + labelNames: ["type", "path", "code"], +}); + +const apiRequestDurationSummary = new Summary({ + name: "trpc_request_duration_seconds", + help: "Duration of tRPC requests in seconds", + labelNames: ["type", "path"], +}); + +// Register all metrics +register.registerMetric(queuePendingJobsGauge); +register.registerMetric(totalUsersGauge); +register.registerMetric(totalAssetSizeGauge); +register.registerMetric(totalBookmarksGauge); +register.registerMetric(apiRequestsTotalCounter); +register.registerMetric(apiErrorsTotalCounter); +register.registerMetric(apiRequestDurationSummary); + +export { + queuePendingJobsGauge, + totalUsersGauge, + totalAssetSizeGauge, + totalBookmarksGauge, + apiRequestsTotalCounter, + apiErrorsTotalCounter, + apiRequestDurationSummary, +}; |
