From 7ab7db8e48360417498643eec2384b0fcb7fbdfb Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Tue, 30 Dec 2025 12:43:08 +0200 Subject: chore: worker tracing (#2321) --- apps/workers/workerTracing.ts | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 apps/workers/workerTracing.ts (limited to 'apps/workers/workerTracing.ts') diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts new file mode 100644 index 00000000..3ff16d1c --- /dev/null +++ b/apps/workers/workerTracing.ts @@ -0,0 +1,43 @@ +import type { DequeuedJob } from "@karakeep/shared/queueing"; +import { getTracer, withSpan } from "@karakeep/shared-server"; + +const tracer = getTracer("@karakeep/workers"); + +type WorkerRunFn = ( + job: DequeuedJob, +) => Promise; + +/** + * Wraps a worker run function with OpenTelemetry tracing. + * Creates a span for each job execution and automatically handles error recording. + * + * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run") + * @param fn - The worker run function to wrap + * @returns A wrapped function that executes within a traced span + * + * @example + * ```ts + * const run = withWorkerTracing("feedWorker.run", async (job) => { + * // Your worker logic here + * }); + * ``` + */ +export function withWorkerTracing( + name: string, + fn: WorkerRunFn, +): WorkerRunFn { + return async (job: DequeuedJob): Promise => { + return await withSpan( + tracer, + name, + { + attributes: { + "job.id": job.id, + "job.priority": job.priority, + "job.runNumber": job.runNumber, + }, + }, + () => fn(job), + ); + }; +} -- cgit v1.2.3-70-g09d2