diff options
| author | Mohamed Bassem <me@mbassem.com> | 2025-12-29 19:11:16 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2025-12-29 17:11:16 +0000 |
| commit | 5537fe85ed65444359bfd066707760d6395fc7a4 (patch) | |
| tree | 03024b1312f04d9cd4ff09b327eae7cea3f258a9 /packages | |
| parent | f7920bdc94d97a6a94477f49e145432607b94951 (diff) | |
| download | karakeep-5537fe85ed65444359bfd066707760d6395fc7a4.tar.zst | |
feat: Add open telemetry (#2318)
* feat: add OpenTelemetry tracing infrastructure
Introduce distributed tracing capabilities using OpenTelemetry:
- Add @opentelemetry packages to shared-server for tracing
- Create tracing utility module with span helpers (withSpan, addSpanEvent, etc.)
- Add tRPC middleware for automatic span creation on API calls
- Initialize tracing in API and workers entry points
- Add demo instrumentation to bookmark creation and crawler worker
- Add configuration options (OTEL_TRACING_ENABLED, OTEL_EXPORTER_OTLP_ENDPOINT, etc.)
- Document tracing configuration in environment variables docs
When enabled, traces are collected for tRPC calls, bookmark creation flow,
and crawler operations, with support for any OTLP-compatible backend (Jaeger, Tempo, etc.)
* refactor: remove tracing from workers for now
Keep tracing infrastructure but remove worker instrumentation:
- Remove tracing initialization from workers entry point
- Remove tracing instrumentation from crawler worker
- Fix formatting in tracing files
The tracing infrastructure remains available for future use.
* add hono and next tracing
* remove extra span logging
* more fixes
* update config
* some fixes
* upgrade packages
* remove unneeded packages
---------
Co-authored-by: Claude <noreply@anthropic.com>
Diffstat (limited to 'packages')
| -rw-r--r-- | packages/api/index.ts | 17 | ||||
| -rw-r--r-- | packages/api/package.json | 1 | ||||
| -rw-r--r-- | packages/shared-server/package.json | 8 | ||||
| -rw-r--r-- | packages/shared-server/src/index.ts | 1 | ||||
| -rw-r--r-- | packages/shared-server/src/tracing.ts | 272 | ||||
| -rw-r--r-- | packages/shared/config.ts | 12 | ||||
| -rw-r--r-- | packages/trpc/index.ts | 5 | ||||
| -rw-r--r-- | packages/trpc/lib/tracing.ts | 63 |
8 files changed, 376 insertions, 3 deletions
diff --git a/packages/api/index.ts b/packages/api/index.ts index 3df7b429..ac31c977 100644 --- a/packages/api/index.ts +++ b/packages/api/index.ts @@ -1,9 +1,11 @@ +import { httpInstrumentationMiddleware } from "@hono/otel"; import { Hono } from "hono"; import { cors } from "hono/cors"; import { logger as loggerMiddleware } from "hono/logger"; import { poweredBy } from "hono/powered-by"; import { loadAllPlugins } from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; import { Context } from "@karakeep/trpc"; @@ -52,7 +54,20 @@ const app = new Hono<{ logger.info(str); }), ) - .use(poweredBy()) + .use(poweredBy()); + +// Add OpenTelemetry middleware if tracing is enabled +if (serverConfig.tracing.enabled) { + app.use( + "*", + httpInstrumentationMiddleware({ + serviceName: `${serverConfig.tracing.serviceName}-api`, + serviceVersion: serverConfig.serverVersion ?? "unknown", + }), + ); +} + +app .use( cors({ origin: "*", diff --git a/packages/api/package.json b/packages/api/package.json index b5d90f03..e49204b9 100644 --- a/packages/api/package.json +++ b/packages/api/package.json @@ -13,6 +13,7 @@ "test": "vitest" }, "dependencies": { + "@hono/otel": "^1.1.0", "@hono/prometheus": "^1.0.2", "@hono/trpc-server": "^0.4.0", "@hono/zod-validator": "^0.5.0", diff --git a/packages/shared-server/package.json b/packages/shared-server/package.json index 578c3330..357248b4 100644 --- a/packages/shared-server/package.json +++ b/packages/shared-server/package.json @@ -7,7 +7,13 @@ "dependencies": { "@karakeep/db": "workspace:^0.1.0", "@karakeep/plugins": "workspace:^0.1.0", - "@karakeep/shared": "workspace:^0.1.0" + "@karakeep/shared": "workspace:^0.1.0", + "@opentelemetry/api": "^1.9.0", + "@opentelemetry/exporter-trace-otlp-http": "^0.208.0", + "@opentelemetry/resources": "^2.2.0", + "@opentelemetry/sdk-trace-base": "^2.2.0", + "@opentelemetry/sdk-trace-node": "^2.2.0", + "@opentelemetry/semantic-conventions": "^1.38.0" }, "devDependencies": { "@karakeep/prettier-config": "workspace:^0.1.0", diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts index d42118c2..8e3b2e52 100644 --- a/packages/shared-server/src/index.ts +++ b/packages/shared-server/src/index.ts @@ -1,3 +1,4 @@ export { loadAllPlugins } from "./plugins"; export { QuotaService, StorageQuotaError } from "./services/quotaService"; export * from "./queues"; +export * from "./tracing"; diff --git a/packages/shared-server/src/tracing.ts b/packages/shared-server/src/tracing.ts new file mode 100644 index 00000000..e831e019 --- /dev/null +++ b/packages/shared-server/src/tracing.ts @@ -0,0 +1,272 @@ +import type { Context, Span, Tracer } from "@opentelemetry/api"; +import { + context, + propagation, + SpanKind, + SpanStatusCode, + trace, +} from "@opentelemetry/api"; +import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; +import { resourceFromAttributes } from "@opentelemetry/resources"; +import { + BatchSpanProcessor, + ConsoleSpanExporter, + ParentBasedSampler, + SimpleSpanProcessor, + TraceIdRatioBasedSampler, +} from "@opentelemetry/sdk-trace-base"; +import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; +import { + ATTR_SERVICE_NAME, + ATTR_SERVICE_VERSION, +} from "@opentelemetry/semantic-conventions"; + +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; + +let tracerProvider: NodeTracerProvider | null = null; +let isInitialized = false; + +/** + * Initialize the OpenTelemetry tracing infrastructure. + * Should be called once at application startup. + */ +export function initTracing(serviceSuffix?: string): void { + if (isInitialized) { + logger.debug("Tracing already initialized, skipping"); + return; + } + + if (!serverConfig.tracing.enabled) { + logger.info("Tracing is disabled"); + isInitialized = true; + return; + } + + const serviceName = serviceSuffix + ? `${serverConfig.tracing.serviceName}-${serviceSuffix}` + : serverConfig.tracing.serviceName; + + logger.info(`Initializing OpenTelemetry tracing for service: ${serviceName}`); + + const resource = resourceFromAttributes({ + [ATTR_SERVICE_NAME]: serviceName, + [ATTR_SERVICE_VERSION]: serverConfig.serverVersion ?? "unknown", + }); + + // Configure span processors + const spanProcessors = []; + + if (serverConfig.tracing.otlpEndpoint) { + // OTLP exporter (Jaeger, Zipkin, etc.) + const otlpExporter = new OTLPTraceExporter({ + url: serverConfig.tracing.otlpEndpoint, + }); + spanProcessors.push(new BatchSpanProcessor(otlpExporter)); + logger.info( + `OTLP exporter configured: ${serverConfig.tracing.otlpEndpoint}`, + ); + } else { + // Fallback to console exporter for development + spanProcessors.push(new SimpleSpanProcessor(new ConsoleSpanExporter())); + logger.info("Console span exporter configured (no OTLP endpoint set)"); + } + + tracerProvider = new NodeTracerProvider({ + resource, + sampler: new ParentBasedSampler({ + root: new TraceIdRatioBasedSampler(serverConfig.tracing.sampleRate), + }), + spanProcessors, + }); + + // Register the provider globally + tracerProvider.register(); + + isInitialized = true; + logger.info("OpenTelemetry tracing initialized successfully"); +} + +/** + * Shutdown the tracing infrastructure gracefully. + * Should be called on application shutdown. + */ +export async function shutdownTracing(): Promise<void> { + if (tracerProvider) { + await tracerProvider.shutdown(); + logger.info("OpenTelemetry tracing shut down"); + } +} + +/** + * Get a tracer instance for creating spans. + * @param name - The name of the tracer (typically the module/component name) + */ +export function getTracer(name: string): Tracer { + return trace.getTracer(name); +} + +/** + * Get the currently active span, if any. + */ +export function getActiveSpan(): Span | undefined { + return trace.getActiveSpan(); +} + +/** + * Get the current trace context. + */ +export function getActiveContext(): Context { + return context.active(); +} + +/** + * Execute a function within a new span. + * Automatically handles error recording and span status. + */ +export async function withSpan<T>( + tracer: Tracer, + spanName: string, + options: { + kind?: SpanKind; + attributes?: Record<string, string | number | boolean>; + }, + fn: (span: Span) => Promise<T>, +): Promise<T> { + return tracer.startActiveSpan( + spanName, + { + kind: options.kind ?? SpanKind.INTERNAL, + attributes: options.attributes, + }, + async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + span.recordException( + error instanceof Error ? error : new Error(String(error)), + ); + throw error; + } finally { + span.end(); + } + }, + ); +} + +/** + * Execute a synchronous function within a new span. + */ +export function withSpanSync<T>( + tracer: Tracer, + spanName: string, + options: { + kind?: SpanKind; + attributes?: Record<string, string | number | boolean>; + }, + fn: (span: Span) => T, +): T { + const span = tracer.startSpan(spanName, { + kind: options.kind ?? SpanKind.INTERNAL, + attributes: options.attributes, + }); + + try { + const result = context.with(trace.setSpan(context.active(), span), () => + fn(span), + ); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + span.recordException( + error instanceof Error ? error : new Error(String(error)), + ); + throw error; + } finally { + span.end(); + } +} + +/** + * Add an event to the current active span. + */ +export function addSpanEvent( + name: string, + attributes?: Record<string, string | number | boolean>, +): void { + const span = getActiveSpan(); + if (span) { + span.addEvent(name, attributes); + } +} + +/** + * Set attributes on the current active span. + */ +export function setSpanAttributes( + attributes: Record<string, string | number | boolean>, +): void { + const span = getActiveSpan(); + if (span) { + span.setAttributes(attributes); + } +} + +/** + * Record an error on the current active span. + */ +export function recordSpanError(error: Error): void { + const span = getActiveSpan(); + if (span) { + span.recordException(error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message, + }); + } +} + +/** + * Extract trace context from HTTP headers (for distributed tracing). + */ +export function extractTraceContext( + headers: Record<string, string | string[] | undefined>, +): Context { + const normalizedHeaders: Record<string, string> = {}; + for (const [key, value] of Object.entries(headers)) { + if (value) { + normalizedHeaders[key] = Array.isArray(value) ? value[0] : value; + } + } + return propagation.extract(context.active(), normalizedHeaders); +} + +/** + * Inject trace context into HTTP headers (for distributed tracing). + */ +export function injectTraceContext( + headers: Record<string, string>, +): Record<string, string> { + propagation.inject(context.active(), headers); + return headers; +} + +/** + * Run a function within a specific context. + */ +export function runWithContext<T>(ctx: Context, fn: () => T): T { + return context.with(ctx, fn); +} + +// Re-export commonly used types and constants +export { SpanKind, SpanStatusCode } from "@opentelemetry/api"; diff --git a/packages/shared/config.ts b/packages/shared/config.ts index c10634e2..28dcc624 100644 --- a/packages/shared/config.ts +++ b/packages/shared/config.ts @@ -211,6 +211,12 @@ const allEnv = z.object({ // Database configuration DB_WAL_MODE: stringBool("false"), + + // OpenTelemetry tracing configuration + OTEL_TRACING_ENABLED: stringBool("false"), + OTEL_EXPORTER_OTLP_ENDPOINT: z.string().url().optional(), + OTEL_SERVICE_NAME: z.string().default("karakeep"), + OTEL_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0), }); const serverConfigSchema = allEnv.transform((val, ctx) => { @@ -413,6 +419,12 @@ const serverConfigSchema = allEnv.transform((val, ctx) => { database: { walMode: val.DB_WAL_MODE, }, + tracing: { + enabled: val.OTEL_TRACING_ENABLED, + otlpEndpoint: val.OTEL_EXPORTER_OTLP_ENDPOINT, + serviceName: val.OTEL_SERVICE_NAME, + sampleRate: val.OTEL_SAMPLE_RATE, + }, }; if (obj.auth.emailVerificationRequired && !obj.email.smtp) { ctx.addIssue({ diff --git a/packages/trpc/index.ts b/packages/trpc/index.ts index 555ca3ba..178703f0 100644 --- a/packages/trpc/index.ts +++ b/packages/trpc/index.ts @@ -6,6 +6,7 @@ import type { db } from "@karakeep/db"; import serverConfig from "@karakeep/shared/config"; import { createRateLimitMiddleware } from "./lib/rateLimit"; +import { createTracingMiddleware } from "./lib/tracing"; import { apiErrorsTotalCounter, apiRequestDurationSummary, @@ -86,7 +87,9 @@ export const procedure = t.procedure }); end(); return res; - }); + }) + // OpenTelemetry tracing middleware + .use(createTracingMiddleware()); // Default public procedure rate limiting export const publicProcedure = procedure.use( diff --git a/packages/trpc/lib/tracing.ts b/packages/trpc/lib/tracing.ts new file mode 100644 index 00000000..7b4fb39f --- /dev/null +++ b/packages/trpc/lib/tracing.ts @@ -0,0 +1,63 @@ +import { SpanKind } from "@opentelemetry/api"; + +import { + getTracer, + setSpanAttributes, + withSpan, +} from "@karakeep/shared-server"; +import serverConfig from "@karakeep/shared/config"; + +import type { Context } from "../index"; + +const tracer = getTracer("@karakeep/trpc"); + +/** + * tRPC middleware that creates a span for each procedure call. + * This integrates OpenTelemetry tracing into the tRPC layer. + */ +export function createTracingMiddleware() { + return async function tracingMiddleware<T>(opts: { + ctx: Context; + type: "query" | "mutation" | "subscription"; + path: string; + input: unknown; + next: () => Promise<T>; + }): Promise<T> { + // Skip if tracing is disabled + if (!serverConfig.tracing.enabled) { + return opts.next(); + } + + const spanName = `trpc.${opts.type}.${opts.path}`; + + return withSpan( + tracer, + spanName, + { + kind: SpanKind.SERVER, + attributes: { + "rpc.system": "trpc", + "rpc.method": opts.path, + "rpc.type": opts.type, + "user.id": opts.ctx.user?.id ?? "anonymous", + "user.role": opts.ctx.user?.role ?? "none", + }, + }, + async () => { + return await opts.next(); + }, + ); + }; +} + +/** + * Helper to add tracing attributes within a tRPC procedure. + * Use this to add custom attributes to the current span. + */ +export function addTracingAttributes( + attributes: Record<string, string | number | boolean>, +): void { + if (serverConfig.tracing.enabled) { + setSpanAttributes(attributes); + } +} |
