aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/workerTracing.ts
diff options
context:
space:
mode:
Diffstat (limited to 'apps/workers/workerTracing.ts')
-rw-r--r--apps/workers/workerTracing.ts43
1 files changed, 43 insertions, 0 deletions
diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts
new file mode 100644
index 00000000..3ff16d1c
--- /dev/null
+++ b/apps/workers/workerTracing.ts
@@ -0,0 +1,43 @@
+import type { DequeuedJob } from "@karakeep/shared/queueing";
+import { getTracer, withSpan } from "@karakeep/shared-server";
+
+const tracer = getTracer("@karakeep/workers");
+
+type WorkerRunFn<TData, TResult = void> = (
+ job: DequeuedJob<TData>,
+) => Promise<TResult>;
+
+/**
+ * Wraps a worker run function with OpenTelemetry tracing.
+ * Creates a span for each job execution and automatically handles error recording.
+ *
+ * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run")
+ * @param fn - The worker run function to wrap
+ * @returns A wrapped function that executes within a traced span
+ *
+ * @example
+ * ```ts
+ * const run = withWorkerTracing("feedWorker.run", async (job) => {
+ * // Your worker logic here
+ * });
+ * ```
+ */
+export function withWorkerTracing<TData, TResult = void>(
+ name: string,
+ fn: WorkerRunFn<TData, TResult>,
+): WorkerRunFn<TData, TResult> {
+ return async (job: DequeuedJob<TData>): Promise<TResult> => {
+ return await withSpan(
+ tracer,
+ name,
+ {
+ attributes: {
+ "job.id": job.id,
+ "job.priority": job.priority,
+ "job.runNumber": job.runNumber,
+ },
+ },
+ () => fn(job),
+ );
+ };
+}