diff options
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/api/index.ts | 5 | ||||
| -rw-r--r-- | packages/api/middlewares/prometheusAuth.ts | 33 | ||||
| -rw-r--r-- | packages/api/package.json | 1 | ||||
| -rw-r--r-- | packages/api/routes/metrics.ts | 16 | ||||
| -rw-r--r-- | packages/shared/config.ts | 6 | ||||
| -rw-r--r-- | packages/trpc/index.ts | 42 | ||||
| -rw-r--r-- | packages/trpc/package.json | 1 | ||||
| -rw-r--r-- | packages/trpc/stats.ts | 142 |
8 files changed, 237 insertions, 9 deletions
diff --git a/packages/api/index.ts b/packages/api/index.ts index 2eb22d8f..4103e033 100644 --- a/packages/api/index.ts +++ b/packages/api/index.ts @@ -10,6 +10,7 @@ import bookmarks from "./routes/bookmarks"; import health from "./routes/health"; import highlights from "./routes/highlights"; import lists from "./routes/lists"; +import metrics, { registerMetrics } from "./routes/metrics"; import publicRoute from "./routes/public"; import rss from "./routes/rss"; import tags from "./routes/tags"; @@ -37,6 +38,7 @@ const app = new Hono<{ }>() .use(logger()) .use(poweredBy()) + .use("*", registerMetrics) .use(async (c, next) => { // Ensure that the ctx is set if (!c.var.ctx) { @@ -49,6 +51,7 @@ const app = new Hono<{ .route("/trpc", trpc) .route("/v1", v1) .route("/assets", assets) - .route("/public", publicRoute); + .route("/public", publicRoute) + .route("/metrics", metrics); export default app; diff --git a/packages/api/middlewares/prometheusAuth.ts b/packages/api/middlewares/prometheusAuth.ts new file mode 100644 index 00000000..bf35608f --- /dev/null +++ b/packages/api/middlewares/prometheusAuth.ts @@ -0,0 +1,33 @@ +import { createMiddleware } from "hono/factory"; +import { HTTPException } from "hono/http-exception"; + +import serverConfig from "@karakeep/shared/config"; + +export const prometheusAuthMiddleware = createMiddleware(async (c, next) => { + const { metricsToken } = serverConfig.prometheus; + + // If no token is configured, deny access (safe default) + if (!metricsToken) { + throw new HTTPException(404, { + message: "Not Found", + }); + } + + const auth = c.req.header("Authorization"); + + if (!auth || !auth.startsWith("Bearer ")) { + throw new HTTPException(401, { + message: "Unauthorized", + }); + } + + const token = auth.slice(7); // Remove "Bearer " prefix + + if (token !== metricsToken) { + throw new HTTPException(401, { + message: "Unauthorized", + }); + } + + await next(); +}); diff --git a/packages/api/package.json b/packages/api/package.json index 54656e64..18d70501 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -13,6 +13,7 @@ "test": "vitest" }, "dependencies": { + "@hono/prometheus": "^1.0.2", "@hono/trpc-server": "^0.4.0", "@hono/zod-validator": "^0.5.0", "@karakeep/db": "workspace:*", diff --git a/packages/api/routes/metrics.ts b/packages/api/routes/metrics.ts new file mode 100644 index 00000000..90eff5b9 --- /dev/null +++ b/packages/api/routes/metrics.ts @@ -0,0 +1,16 @@ +// Import stats to register Prometheus metrics +import "@karakeep/trpc/stats"; + +import { prometheus } from "@hono/prometheus"; +import { Hono } from "hono"; +import { register } from "prom-client"; + +import { prometheusAuthMiddleware } from "../middlewares/prometheusAuth"; + +export const { printMetrics, registerMetrics } = prometheus({ + registry: register, +}); + +const app = new Hono().get("/", prometheusAuthMiddleware, printMetrics); + +export default app; diff --git a/packages/shared/config.ts b/packages/shared/config.ts index 9294e154..c435d012 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -94,6 +94,9 @@ const allEnv = z.object({ // A flag to detect if the user is running in the old separete containers setup USING_LEGACY_SEPARATE_CONTAINERS: stringBool("false"), + // Prometheus metrics configuration + PROMETHEUS_AUTH_TOKEN: z.string().optional(), + // Asset storage configuration ASSET_STORE_S3_ENDPOINT: z.string().optional(), ASSET_STORE_S3_REGION: z.string().optional(), @@ -222,6 +225,9 @@ const serverConfigSchema = allEnv.transform((val) => { forcePathStyle: val.ASSET_STORE_S3_FORCE_PATH_STYLE, }, }, + prometheus: { + metricsToken: val.PROMETHEUS_AUTH_TOKEN, + }, }; }); diff --git a/packages/trpc/index.ts b/packages/trpc/index.ts index e34e56eb..90f37ae4 100644 --- a/packages/trpc/index.ts +++ b/packages/trpc/index.ts @@ -5,6 +5,12 @@ import { ZodError } from "zod"; import type { db } from "@karakeep/db"; import serverConfig from "@karakeep/shared/config"; +import { + apiErrorsTotalCounter, + apiRequestDurationSummary, + apiRequestsTotalCounter, +} from "./stats"; + interface User { id: string; name?: string | null | undefined; @@ -36,6 +42,11 @@ const t = initTRPC.context<Context>().create({ transformer: superjson, errorFormatter(opts) { const { shape, error } = opts; + apiErrorsTotalCounter.inc({ + type: opts.type, + path: opts.path, + code: error.code, + }); return { ...shape, data: { @@ -51,15 +62,30 @@ const t = initTRPC.context<Context>().create({ export const createCallerFactory = t.createCallerFactory; // Base router and procedure helpers export const router = t.router; -export const procedure = t.procedure.use(function isDemoMode(opts) { - if (serverConfig.demoMode && opts.type == "mutation") { - throw new TRPCError({ - message: "Mutations are not allowed in demo mode", - code: "FORBIDDEN", +export const procedure = t.procedure + .use(function isDemoMode(opts) { + if (serverConfig.demoMode && opts.type == "mutation") { + throw new TRPCError({ + message: "Mutations are not allowed in demo mode", + code: "FORBIDDEN", + }); + } + return opts.next(); + }) + .use(async (opts) => { + const end = apiRequestDurationSummary.startTimer({ + path: opts.path, + type: opts.type, }); - } - return opts.next(); -}); + const res = await opts.next(); + apiRequestsTotalCounter.inc({ + type: opts.type, + path: opts.path, + is_error: res.ok ? 0 : 1, + }); + end(); + return res; + }); export const publicProcedure = procedure; export const authedProcedure = procedure.use(function isAuthed(opts) { diff --git a/packages/trpc/package.json b/packages/trpc/package.json index b0280d6d..f4a9d122 100644 --- a/packages/trpc/package.json +++ b/packages/trpc/package.json @@ -19,6 +19,7 @@ "bcryptjs": "^2.4.3", "deep-equal": "^2.2.3", "drizzle-orm": "^0.38.3", + "prom-client": "^15.1.3", "superjson": "^2.2.1", "tiny-invariant": "^1.3.3", "zod": "^3.24.2" diff --git a/packages/trpc/stats.ts b/packages/trpc/stats.ts new file mode 100644 index 00000000..465bddcd --- /dev/null +++ b/packages/trpc/stats.ts @@ -0,0 +1,142 @@ +import { count, sum } from "drizzle-orm"; +import { Counter, Gauge, register, Summary } from "prom-client"; + +import { db } from "@karakeep/db"; +import { assets, bookmarks, users } from "@karakeep/db/schema"; +import { + AssetPreprocessingQueue, + FeedQueue, + LinkCrawlerQueue, + OpenAIQueue, + RuleEngineQueue, + SearchIndexingQueue, + TidyAssetsQueue, + VideoWorkerQueue, + WebhookQueue, +} from "@karakeep/shared/queues"; + +// Queue metrics +const queuePendingJobsGauge = new Gauge({ + name: "queue_jobs", + help: "Number of jobs in each background queue", + labelNames: ["queue_name", "status"], + async collect() { + const queues = [ + { name: "link_crawler", queue: LinkCrawlerQueue }, + { name: "openai", queue: OpenAIQueue }, + { name: "search_indexing", queue: SearchIndexingQueue }, + { name: "tidy_assets", queue: TidyAssetsQueue }, + { name: "video_worker", queue: VideoWorkerQueue }, + { name: "feed", queue: FeedQueue }, + { name: "asset_preprocessing", queue: AssetPreprocessingQueue }, + { name: "webhook", queue: WebhookQueue }, + { name: "rule_engine", queue: RuleEngineQueue }, + ]; + + const stats = await Promise.all( + queues.map(async ({ name, queue }) => { + try { + return { + ...(await queue.stats()), + name, + }; + } catch (error) { + console.error(`Failed to get stats for queue ${name}:`, error); + return { name, pending: 0, pending_retry: 0, failed: 0, running: 0 }; + } + }), + ); + + stats.forEach(({ name, pending, pending_retry, failed, running }) => { + this.set({ queue_name: name, status: "pending" }, pending); + this.set({ queue_name: name, status: "pending_retry" }, pending_retry); + this.set({ queue_name: name, status: "failed" }, failed); + this.set({ queue_name: name, status: "running" }, running); + }); + }, +}); + +// User metrics +const totalUsersGauge = new Gauge({ + name: "total_users", + help: "Total number of users in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(users); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get user count:", error); + this.set(0); + } + }, +}); + +// Asset metrics +const totalAssetSizeGauge = new Gauge({ + name: "total_asset_size_bytes", + help: "Total size of all assets in bytes", + async collect() { + try { + const result = await db + .select({ totalSize: sum(assets.size) }) + .from(assets); + this.set(Number(result[0]?.totalSize ?? 0)); + } catch (error) { + console.error("Failed to get total asset size:", error); + this.set(0); + } + }, +}); + +// Bookmark metrics +const totalBookmarksGauge = new Gauge({ + name: "total_bookmarks", + help: "Total number of bookmarks in the system", + async collect() { + try { + const result = await db.select({ count: count() }).from(bookmarks); + this.set(result[0]?.count ?? 0); + } catch (error) { + console.error("Failed to get bookmark count:", error); + this.set(0); + } + }, +}); + +// Api metrics +const apiRequestsTotalCounter = new Counter({ + name: "trpc_requests_total", + help: "Total number of API requests", + labelNames: ["type", "path", "is_error"], +}); + +const apiErrorsTotalCounter = new Counter({ + name: "trpc_errors_total", + help: "Total number of API requests", + labelNames: ["type", "path", "code"], +}); + +const apiRequestDurationSummary = new Summary({ + name: "trpc_request_duration_seconds", + help: "Duration of tRPC requests in seconds", + labelNames: ["type", "path"], +}); + +// Register all metrics +register.registerMetric(queuePendingJobsGauge); +register.registerMetric(totalUsersGauge); +register.registerMetric(totalAssetSizeGauge); +register.registerMetric(totalBookmarksGauge); +register.registerMetric(apiRequestsTotalCounter); +register.registerMetric(apiErrorsTotalCounter); +register.registerMetric(apiRequestDurationSummary); + +export { + queuePendingJobsGauge, + totalUsersGauge, + totalAssetSizeGauge, + totalBookmarksGauge, + apiRequestsTotalCounter, + apiErrorsTotalCounter, + apiRequestDurationSummary, +}; |
