aboutsummaryrefslogtreecommitdiffstats
path: root/apps/workers/workerTracing.ts
diff options
context:
space:
mode:
authorMohamed Bassem <me@mbassem.com>2025-12-30 12:43:08 +0200
committerGitHub <noreply@github.com>2025-12-30 10:43:08 +0000
commit7ab7db8e48360417498643eec2384b0fcb7fbdfb (patch)
tree2f9a789117cbb0725be771d8203c811da1c02b0b /apps/workers/workerTracing.ts
parentd852ee1a69b7ade4bf1e4e03a0bd30af82543414 (diff)
downloadkarakeep-7ab7db8e48360417498643eec2384b0fcb7fbdfb.tar.zst
chore: worker tracing (#2321)
Diffstat (limited to 'apps/workers/workerTracing.ts')
-rw-r--r--apps/workers/workerTracing.ts43
1 files changed, 43 insertions, 0 deletions
diff --git a/apps/workers/workerTracing.ts b/apps/workers/workerTracing.ts
new file mode 100644
index 00000000..3ff16d1c
--- /dev/null
+++ b/apps/workers/workerTracing.ts
@@ -0,0 +1,43 @@
+import type { DequeuedJob } from "@karakeep/shared/queueing";
+import { getTracer, withSpan } from "@karakeep/shared-server";
+
+const tracer = getTracer("@karakeep/workers");
+
+type WorkerRunFn<TData, TResult = void> = (
+ job: DequeuedJob<TData>,
+) => Promise<TResult>;
+
+/**
+ * Wraps a worker run function with OpenTelemetry tracing.
+ * Creates a span for each job execution and automatically handles error recording.
+ *
+ * @param name - The name of the span (e.g., "feedWorker.run", "crawlerWorker.run")
+ * @param fn - The worker run function to wrap
+ * @returns A wrapped function that executes within a traced span
+ *
+ * @example
+ * ```ts
+ * const run = withWorkerTracing("feedWorker.run", async (job) => {
+ * // Your worker logic here
+ * });
+ * ```
+ */
+export function withWorkerTracing<TData, TResult = void>(
+ name: string,
+ fn: WorkerRunFn<TData, TResult>,
+): WorkerRunFn<TData, TResult> {
+ return async (job: DequeuedJob<TData>): Promise<TResult> => {
+ return await withSpan(
+ tracer,
+ name,
+ {
+ attributes: {
+ "job.id": job.id,
+ "job.priority": job.priority,
+ "job.runNumber": job.runNumber,
+ },
+ },
+ () => fn(job),
+ );
+ };
+}