aboutsummaryrefslogtreecommitdiffstats
path: root/packages
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-07-06 18:07:56 +0000
committerMohamed Bassem <me@mbassem.com>2025-07-06 18:09:05 +0000
commitb60ece578304df21602d39c7022a7a4dbc6437e0 (patch)
treea5e3395b0b3b5c9bb01566bf68aa21334fd13784 /packages
parentcfa0385b4dcd37f9cc29a15f94a59a4f48dd05fb (diff)
downloadkarakeep-b60ece578304df21602d39c7022a7a4dbc6437e0.tar.zst
feat: Add prometheus monitoring. Fixes #758
Diffstat (limited to 'packages')
-rw-r--r--packages/api/index.ts5
-rw-r--r--packages/api/middlewares/prometheusAuth.ts33
-rw-r--r--packages/api/package.json1
-rw-r--r--packages/api/routes/metrics.ts16
-rw-r--r--packages/shared/config.ts6
-rw-r--r--packages/trpc/index.ts42
-rw-r--r--packages/trpc/package.json1
-rw-r--r--packages/trpc/stats.ts142
8 files changed, 237 insertions, 9 deletions
diff --git a/packages/api/index.ts b/packages/api/index.ts
index 2eb22d8f..4103e033 100644
--- a/packages/api/index.ts
+++ b/packages/api/index.ts
@@ -10,6 +10,7 @@ import bookmarks from "./routes/bookmarks";
import health from "./routes/health";
import highlights from "./routes/highlights";
import lists from "./routes/lists";
+import metrics, { registerMetrics } from "./routes/metrics";
import publicRoute from "./routes/public";
import rss from "./routes/rss";
import tags from "./routes/tags";
@@ -37,6 +38,7 @@ const app = new Hono<{
}>()
.use(logger())
.use(poweredBy())
+ .use("*", registerMetrics)
.use(async (c, next) => {
// Ensure that the ctx is set
if (!c.var.ctx) {
@@ -49,6 +51,7 @@ const app = new Hono<{
.route("/trpc", trpc)
.route("/v1", v1)
.route("/assets", assets)
- .route("/public", publicRoute);
+ .route("/public", publicRoute)
+ .route("/metrics", metrics);
export default app;
diff --git a/packages/api/middlewares/prometheusAuth.ts b/packages/api/middlewares/prometheusAuth.ts
new file mode 100644
index 00000000..bf35608f
--- /dev/null
+++ b/packages/api/middlewares/prometheusAuth.ts
@@ -0,0 +1,33 @@
+import { createMiddleware } from "hono/factory";
+import { HTTPException } from "hono/http-exception";
+
+import serverConfig from "@karakeep/shared/config";
+
+export const prometheusAuthMiddleware = createMiddleware(async (c, next) => {
+ const { metricsToken } = serverConfig.prometheus;
+
+ // If no token is configured, deny access (safe default)
+ if (!metricsToken) {
+ throw new HTTPException(404, {
+ message: "Not Found",
+ });
+ }
+
+ const auth = c.req.header("Authorization");
+
+ if (!auth || !auth.startsWith("Bearer ")) {
+ throw new HTTPException(401, {
+ message: "Unauthorized",
+ });
+ }
+
+ const token = auth.slice(7); // Remove "Bearer " prefix
+
+ if (token !== metricsToken) {
+ throw new HTTPException(401, {
+ message: "Unauthorized",
+ });
+ }
+
+ await next();
+});
diff --git a/packages/api/package.json b/packages/api/package.json
index 54656e64..18d70501 100644
--- a/packages/api/package.json
+++ b/packages/api/package.json
@@ -13,6 +13,7 @@
"test": "vitest"
},
"dependencies": {
+ "@hono/prometheus": "^1.0.2",
"@hono/trpc-server": "^0.4.0",
"@hono/zod-validator": "^0.5.0",
"@karakeep/db": "workspace:*",
diff --git a/packages/api/routes/metrics.ts b/packages/api/routes/metrics.ts
new file mode 100644
index 00000000..90eff5b9
--- /dev/null
+++ b/packages/api/routes/metrics.ts
@@ -0,0 +1,16 @@
+// Import stats to register Prometheus metrics
+import "@karakeep/trpc/stats";
+
+import { prometheus } from "@hono/prometheus";
+import { Hono } from "hono";
+import { register } from "prom-client";
+
+import { prometheusAuthMiddleware } from "../middlewares/prometheusAuth";
+
+export const { printMetrics, registerMetrics } = prometheus({
+ registry: register,
+});
+
+const app = new Hono().get("/", prometheusAuthMiddleware, printMetrics);
+
+export default app;
diff --git a/packages/shared/config.ts b/packages/shared/config.ts
index 9294e154..c435d012 100644
--- a/packages/shared/config.ts
+++ b/packages/shared/config.ts
@@ -94,6 +94,9 @@ const allEnv = z.object({
// A flag to detect if the user is running in the old separete containers setup
USING_LEGACY_SEPARATE_CONTAINERS: stringBool("false"),
+ // Prometheus metrics configuration
+ PROMETHEUS_AUTH_TOKEN: z.string().optional(),
+
// Asset storage configuration
ASSET_STORE_S3_ENDPOINT: z.string().optional(),
ASSET_STORE_S3_REGION: z.string().optional(),
@@ -222,6 +225,9 @@ const serverConfigSchema = allEnv.transform((val) => {
forcePathStyle: val.ASSET_STORE_S3_FORCE_PATH_STYLE,
},
},
+ prometheus: {
+ metricsToken: val.PROMETHEUS_AUTH_TOKEN,
+ },
};
});
diff --git a/packages/trpc/index.ts b/packages/trpc/index.ts
index e34e56eb..90f37ae4 100644
--- a/packages/trpc/index.ts
+++ b/packages/trpc/index.ts
@@ -5,6 +5,12 @@ import { ZodError } from "zod";
import type { db } from "@karakeep/db";
import serverConfig from "@karakeep/shared/config";
+import {
+ apiErrorsTotalCounter,
+ apiRequestDurationSummary,
+ apiRequestsTotalCounter,
+} from "./stats";
+
interface User {
id: string;
name?: string | null | undefined;
@@ -36,6 +42,11 @@ const t = initTRPC.context<Context>().create({
transformer: superjson,
errorFormatter(opts) {
const { shape, error } = opts;
+ apiErrorsTotalCounter.inc({
+ type: opts.type,
+ path: opts.path,
+ code: error.code,
+ });
return {
...shape,
data: {
@@ -51,15 +62,30 @@ const t = initTRPC.context<Context>().create({
export const createCallerFactory = t.createCallerFactory;
// Base router and procedure helpers
export const router = t.router;
-export const procedure = t.procedure.use(function isDemoMode(opts) {
- if (serverConfig.demoMode && opts.type == "mutation") {
- throw new TRPCError({
- message: "Mutations are not allowed in demo mode",
- code: "FORBIDDEN",
+export const procedure = t.procedure
+ .use(function isDemoMode(opts) {
+ if (serverConfig.demoMode && opts.type == "mutation") {
+ throw new TRPCError({
+ message: "Mutations are not allowed in demo mode",
+ code: "FORBIDDEN",
+ });
+ }
+ return opts.next();
+ })
+ .use(async (opts) => {
+ const end = apiRequestDurationSummary.startTimer({
+ path: opts.path,
+ type: opts.type,
});
- }
- return opts.next();
-});
+ const res = await opts.next();
+ apiRequestsTotalCounter.inc({
+ type: opts.type,
+ path: opts.path,
+ is_error: res.ok ? 0 : 1,
+ });
+ end();
+ return res;
+ });
export const publicProcedure = procedure;
export const authedProcedure = procedure.use(function isAuthed(opts) {
diff --git a/packages/trpc/package.json b/packages/trpc/package.json
index b0280d6d..f4a9d122 100644
--- a/packages/trpc/package.json
+++ b/packages/trpc/package.json
@@ -19,6 +19,7 @@
"bcryptjs": "^2.4.3",
"deep-equal": "^2.2.3",
"drizzle-orm": "^0.38.3",
+ "prom-client": "^15.1.3",
"superjson": "^2.2.1",
"tiny-invariant": "^1.3.3",
"zod": "^3.24.2"
diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts
new file mode 100644
index 00000000..465bddcd
--- /dev/null
+++ b/packages/trpc/stats.ts
@@ -0,0 +1,142 @@
+import { count, sum } from "drizzle-orm";
+import { Counter, Gauge, register, Summary } from "prom-client";
+
+import { db } from "@karakeep/db";
+import { assets, bookmarks, users } from "@karakeep/db/schema";
+import {
+ AssetPreprocessingQueue,
+ FeedQueue,
+ LinkCrawlerQueue,
+ OpenAIQueue,
+ RuleEngineQueue,
+ SearchIndexingQueue,
+ TidyAssetsQueue,
+ VideoWorkerQueue,
+ WebhookQueue,
+} from "@karakeep/shared/queues";
+
+// Queue metrics
+const queuePendingJobsGauge = new Gauge({
+ name: "queue_jobs",
+ help: "Number of jobs in each background queue",
+ labelNames: ["queue_name", "status"],
+ async collect() {
+ const queues = [
+ { name: "link_crawler", queue: LinkCrawlerQueue },
+ { name: "openai", queue: OpenAIQueue },
+ { name: "search_indexing", queue: SearchIndexingQueue },
+ { name: "tidy_assets", queue: TidyAssetsQueue },
+ { name: "video_worker", queue: VideoWorkerQueue },
+ { name: "feed", queue: FeedQueue },
+ { name: "asset_preprocessing", queue: AssetPreprocessingQueue },
+ { name: "webhook", queue: WebhookQueue },
+ { name: "rule_engine", queue: RuleEngineQueue },
+ ];
+
+ const stats = await Promise.all(
+ queues.map(async ({ name, queue }) => {
+ try {
+ return {
+ ...(await queue.stats()),
+ name,
+ };
+ } catch (error) {
+ console.error(`Failed to get stats for queue ${name}:`, error);
+ return { name, pending: 0, pending_retry: 0, failed: 0, running: 0 };
+ }
+ }),
+ );
+
+ stats.forEach(({ name, pending, pending_retry, failed, running }) => {
+ this.set({ queue_name: name, status: "pending" }, pending);
+ this.set({ queue_name: name, status: "pending_retry" }, pending_retry);
+ this.set({ queue_name: name, status: "failed" }, failed);
+ this.set({ queue_name: name, status: "running" }, running);
+ });
+ },
+});
+
+// User metrics
+const totalUsersGauge = new Gauge({
+ name: "total_users",
+ help: "Total number of users in the system",
+ async collect() {
+ try {
+ const result = await db.select({ count: count() }).from(users);
+ this.set(result[0]?.count ?? 0);
+ } catch (error) {
+ console.error("Failed to get user count:", error);
+ this.set(0);
+ }
+ },
+});
+
+// Asset metrics
+const totalAssetSizeGauge = new Gauge({
+ name: "total_asset_size_bytes",
+ help: "Total size of all assets in bytes",
+ async collect() {
+ try {
+ const result = await db
+ .select({ totalSize: sum(assets.size) })
+ .from(assets);
+ this.set(Number(result[0]?.totalSize ?? 0));
+ } catch (error) {
+ console.error("Failed to get total asset size:", error);
+ this.set(0);
+ }
+ },
+});
+
+// Bookmark metrics
+const totalBookmarksGauge = new Gauge({
+ name: "total_bookmarks",
+ help: "Total number of bookmarks in the system",
+ async collect() {
+ try {
+ const result = await db.select({ count: count() }).from(bookmarks);
+ this.set(result[0]?.count ?? 0);
+ } catch (error) {
+ console.error("Failed to get bookmark count:", error);
+ this.set(0);
+ }
+ },
+});
+
+// Api metrics
+const apiRequestsTotalCounter = new Counter({
+ name: "trpc_requests_total",
+ help: "Total number of API requests",
+ labelNames: ["type", "path", "is_error"],
+});
+
+const apiErrorsTotalCounter = new Counter({
+ name: "trpc_errors_total",
+ help: "Total number of API requests",
+ labelNames: ["type", "path", "code"],
+});
+
+const apiRequestDurationSummary = new Summary({
+ name: "trpc_request_duration_seconds",
+ help: "Duration of tRPC requests in seconds",
+ labelNames: ["type", "path"],
+});
+
+// Register all metrics
+register.registerMetric(queuePendingJobsGauge);
+register.registerMetric(totalUsersGauge);
+register.registerMetric(totalAssetSizeGauge);
+register.registerMetric(totalBookmarksGauge);
+register.registerMetric(apiRequestsTotalCounter);
+register.registerMetric(apiErrorsTotalCounter);
+register.registerMetric(apiRequestDurationSummary);
+
+export {
+ queuePendingJobsGauge,
+ totalUsersGauge,
+ totalAssetSizeGauge,
+ totalBookmarksGauge,
+ apiRequestsTotalCounter,
+ apiErrorsTotalCounter,
+ apiRequestDurationSummary,
+};