diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-02-04 13:45:32 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2026-02-04 13:45:32 +0000 |
| commit | 538035c452bfbc042961b199c0f44e733c88bfab (patch) | |
| tree | b401a004a0ca3103264261170602fbf2822df14e | |
| parent | 93ad2e2001eb7070df50b0ab51dfd3e1ab377629 (diff) | |
| download | karakeep-538035c452bfbc042961b199c0f44e733c88bfab.tar.zst | |
feat: add extra instrumentation in the otel traces (#2453)
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | apps/workers/network.ts | 9 | ||||
| -rw-r--r-- | apps/workers/workers/crawlerWorker.ts | 126 | ||||
| -rw-r--r-- | apps/workers/workers/inference/summarize.ts | 32 | ||||
| -rw-r--r-- | apps/workers/workers/inference/tagging.ts | 37 | ||||
| -rw-r--r-- | packages/shared-server/src/tracing.ts | 12 | ||||
| -rw-r--r-- | packages/shared-server/src/tracingTypes.ts | 45 |
7 files changed, 231 insertions, 31 deletions
@@ -66,3 +66,4 @@ auth_failures.log # Local directory for AI agent contexts .aicontext/ +.codex diff --git a/apps/workers/network.ts b/apps/workers/network.ts index 4bc30562..2ef8483f 100644 --- a/apps/workers/network.ts +++ b/apps/workers/network.ts @@ -86,6 +86,15 @@ function isAddressForbidden(address: string): boolean { return DISALLOWED_IP_RANGES.has(parsed.range()); } +export function getBookmarkDomain(url?: string | null): string | undefined { + if (!url) return undefined; + try { + return new URL(url).hostname; + } catch { + return undefined; + } +} + export type UrlValidationResult = | { ok: true; url: URL } | { ok: false; reason: string }; diff --git a/apps/workers/workers/crawlerWorker.ts b/apps/workers/workers/crawlerWorker.ts index 597d45d3..d3c20b7c 100644 --- a/apps/workers/workers/crawlerWorker.ts +++ b/apps/workers/workers/crawlerWorker.ts @@ -30,6 +30,7 @@ import metascraperYoutube from "metascraper-youtube"; import { crawlerStatusCodeCounter, workerStatsCounter } from "metrics"; import { fetchWithProxy, + getBookmarkDomain, getRandomProxy, matchesNoProxy, validateUrl, @@ -57,6 +58,7 @@ import { LinkCrawlerQueue, OpenAIQueue, QuotaService, + setSpanAttributes, triggerSearchReindex, triggerWebhook, VideoWorkerQueue, @@ -435,7 +437,13 @@ async function browserlessCrawlPage( return await withSpan( tracer, "crawlerWorker.browserlessCrawlPage", - { attributes: { url, jobId } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, async () => { logger.info( `[Crawler][${jobId}] Running in browserless mode. Will do a plain http request to "${url}". Screenshots will be disabled.`, @@ -473,7 +481,15 @@ async function crawlPage( return await withSpan( tracer, "crawlerWorker.crawlPage", - { attributes: { url, jobId, userId, forceStorePdf } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "crawler.forceStorePdf": forceStorePdf, + }, + }, async () => { // Check user's browser crawling setting const userData = await db.query.users.findFirst({ @@ -717,7 +733,13 @@ async function extractMetadata( return await withSpan( tracer, "crawlerWorker.extractMetadata", - { attributes: { url, jobId } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, async () => { logger.info( `[Crawler][${jobId}] Will attempt to extract metadata from page ...`, @@ -745,7 +767,13 @@ async function extractReadableContent( return await withSpan( tracer, "crawlerWorker.extractReadableContent", - { attributes: { url, jobId } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, async () => { logger.info( `[Crawler][${jobId}] Will attempt to extract readable content ...`, @@ -790,9 +818,9 @@ async function storeScreenshot( "crawlerWorker.storeScreenshot", { attributes: { - jobId, - userId, - size: screenshot?.byteLength ?? 0, + "job.id": jobId, + "user.id": userId, + "asset.size": screenshot?.byteLength ?? 0, }, }, async () => { @@ -849,9 +877,9 @@ async function storePdf( "crawlerWorker.storePdf", { attributes: { - jobId, - userId, - size: pdf?.byteLength ?? 0, + "job.id": jobId, + "user.id": userId, + "asset.size": pdf?.byteLength ?? 0, }, }, async () => { @@ -902,7 +930,15 @@ async function downloadAndStoreFile( return await withSpan( tracer, "crawlerWorker.downloadAndStoreFile", - { attributes: { url, jobId, userId, fileType } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "asset.type": fileType, + }, + }, async () => { let assetPath: string | undefined; try { @@ -1018,7 +1054,14 @@ async function archiveWebpage( return await withSpan( tracer, "crawlerWorker.archiveWebpage", - { attributes: { url, jobId, userId } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + }, + }, async () => { logger.info(`[Crawler][${jobId}] Will attempt to archive page ...`); const assetId = newAssetId(); @@ -1103,7 +1146,13 @@ async function getContentType( return await withSpan( tracer, "crawlerWorker.getContentType", - { attributes: { url, jobId } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, async () => { try { logger.info( @@ -1113,8 +1162,14 @@ async function getContentType( method: "GET", signal: AbortSignal.any([AbortSignal.timeout(5000), abortSignal]), }); + setSpanAttributes({ + "crawler.getContentType.statusCode": response.status, + }); const rawContentType = response.headers.get("content-type"); const contentType = normalizeContentType(rawContentType); + setSpanAttributes({ + "crawler.contentType": contentType ?? undefined, + }); logger.info( `[Crawler][${jobId}] Content-type for the url ${url} is "${contentType}"`, ); @@ -1148,7 +1203,16 @@ async function handleAsAssetBookmark( return await withSpan( tracer, "crawlerWorker.handleAsAssetBookmark", - { attributes: { url, jobId, userId, bookmarkId, assetType } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "bookmark.id": bookmarkId, + "asset.type": assetType, + }, + }, async () => { const downloaded = await downloadAndStoreFile( url, @@ -1218,9 +1282,11 @@ async function storeHtmlContent( "crawlerWorker.storeHtmlContent", { attributes: { - jobId, - userId, - contentSize: htmlContent ? Buffer.byteLength(htmlContent, "utf8") : 0, + "job.id": jobId, + "user.id": userId, + "bookmark.content.size": htmlContent + ? Buffer.byteLength(htmlContent, "utf8") + : 0, }, }, async () => { @@ -1302,13 +1368,14 @@ async function crawlAndParseUrl( "crawlerWorker.crawlAndParseUrl", { attributes: { - url, - jobId, - userId, - bookmarkId, - archiveFullPage, - forceStorePdf, - hasPrecrawledArchive: !!precrawledArchiveAssetId, + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + "user.id": userId, + "bookmark.id": bookmarkId, + "crawler.archiveFullPage": archiveFullPage, + "crawler.forceStorePdf": forceStorePdf, + "crawler.hasPrecrawledArchive": !!precrawledArchiveAssetId, }, }, async () => { @@ -1357,6 +1424,9 @@ async function crawlAndParseUrl( // Track status code in Prometheus if (statusCode !== null) { crawlerStatusCodeCounter.labels(statusCode.toString()).inc(); + setSpanAttributes({ + "crawler.statusCode": statusCode, + }); } const meta = await Promise.race([ @@ -1576,7 +1646,13 @@ async function checkDomainRateLimit(url: string, jobId: string): Promise<void> { return await withSpan( tracer, "crawlerWorker.checkDomainRateLimit", - { attributes: { url, jobId } }, + { + attributes: { + "bookmark.url": url, + "bookmark.domain": getBookmarkDomain(url), + "job.id": jobId, + }, + }, async () => { const crawlerDomainRateLimitConfig = serverConfig.crawler.domainRatelimiting; diff --git a/apps/workers/workers/inference/summarize.ts b/apps/workers/workers/inference/summarize.ts index 094c46ca..922eb5b7 100644 --- a/apps/workers/workers/inference/summarize.ts +++ b/apps/workers/workers/inference/summarize.ts @@ -1,8 +1,13 @@ import { and, eq } from "drizzle-orm"; +import { getBookmarkDomain } from "network"; import { db } from "@karakeep/db"; import { bookmarks, customPrompts, users } from "@karakeep/db/schema"; -import { triggerSearchReindex, ZOpenAIRequest } from "@karakeep/shared-server"; +import { + setSpanAttributes, + triggerSearchReindex, + ZOpenAIRequest, +} from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import { InferenceClient } from "@karakeep/shared/inference"; import logger from "@karakeep/shared/logger"; @@ -22,6 +27,7 @@ async function fetchBookmarkDetailsForSummary(bookmarkId: string) { description: true, htmlContent: true, contentAssetId: true, + crawlStatusCode: true, publisher: true, author: true, url: true, @@ -65,6 +71,17 @@ export async function runSummarization( }, }); + setSpanAttributes({ + "user.id": bookmarkData.userId, + "bookmark.id": bookmarkData.id, + "bookmark.url": bookmarkData.link?.url, + "bookmark.domain": getBookmarkDomain(bookmarkData.link?.url), + "bookmark.content.type": bookmarkData.type, + "crawler.statusCode": bookmarkData.link?.crawlStatusCode ?? undefined, + "inference.type": "summarization", + "inference.model": serverConfig.inference.textModel, + }); + if (userSettings?.autoSummarizationEnabled === false) { logger.debug( `[inference][${jobId}] Skipping summarization job for bookmark with id "${bookmarkId}" because user has disabled auto-summarization.`, @@ -121,6 +138,10 @@ URL: ${link.url ?? ""} }, }); + setSpanAttributes({ + "inference.prompt.customCount": prompts.length, + }); + const summaryPrompt = await buildSummaryPrompt( userSettings?.inferredTagLang ?? serverConfig.inference.inferredTagLang, prompts.map((p) => p.text), @@ -128,6 +149,10 @@ URL: ${link.url ?? ""} serverConfig.inference.contextLength, ); + setSpanAttributes({ + "inference.prompt.size": Buffer.byteLength(summaryPrompt, "utf8"), + }); + const summaryResult = await inferenceClient.inferFromText(summaryPrompt, { schema: null, // Summaries are typically free-form text abortSignal: job.abortSignal, @@ -139,6 +164,11 @@ URL: ${link.url ?? ""} ); } + setSpanAttributes({ + "inference.summary.size": Buffer.byteLength(summaryResult.response, "utf8"), + "inference.totalTokens": summaryResult.totalTokens, + }); + logger.info( `[inference][${jobId}] Generated summary for bookmark "${bookmarkId}" using ${summaryResult.totalTokens} tokens.`, ); diff --git a/apps/workers/workers/inference/tagging.ts b/apps/workers/workers/inference/tagging.ts index 376eab14..b3006193 100644 --- a/apps/workers/workers/inference/tagging.ts +++ b/apps/workers/workers/inference/tagging.ts @@ -1,4 +1,5 @@ import { and, eq, inArray } from "drizzle-orm"; +import { getBookmarkDomain } from "network"; import { buildImpersonatingTRPCClient } from "trpc"; import { z } from "zod"; @@ -17,6 +18,7 @@ import { users, } from "@karakeep/db/schema"; import { + setSpanAttributes, triggerRuleEngineOnEvent, triggerSearchReindex, triggerWebhook, @@ -150,6 +152,9 @@ async function inferTagsFromImage( } const base64 = asset.toString("base64"); + setSpanAttributes({ + "inference.model": serverConfig.inference.imageModel, + }); return inferenceClient.inferFromImage( buildImagePrompt( inferredTagLang, @@ -176,6 +181,10 @@ async function fetchCustomPrompts( }, }); + setSpanAttributes({ + "inference.prompt.customCount": prompts.length, + }); + let promptTexts = prompts.map((p) => p.text); if (containsTagsPlaceholder(prompts)) { promptTexts = await replaceTagsPlaceholders(promptTexts, userId); @@ -234,6 +243,12 @@ async function inferTagsFromPDF( serverConfig.inference.contextLength, tagStyle, ); + setSpanAttributes({ + "inference.model": serverConfig.inference.textModel, + }); + setSpanAttributes({ + "inference.prompt.size": Buffer.byteLength(prompt, "utf8"), + }); return inferenceClient.inferFromText(prompt, { schema: openAIResponseSchema, abortSignal, @@ -251,6 +266,12 @@ async function inferTagsFromText( if (!prompt) { return null; } + setSpanAttributes({ + "inference.model": serverConfig.inference.textModel, + }); + setSpanAttributes({ + "inference.prompt.size": Buffer.byteLength(prompt, "utf8"), + }); return await inferenceClient.inferFromText(prompt, { schema: openAIResponseSchema, abortSignal, @@ -265,6 +286,18 @@ async function inferTags( tagStyle: ZTagStyle, inferredTagLang: string, ) { + setSpanAttributes({ + "user.id": bookmark.userId, + "bookmark.id": bookmark.id, + "bookmark.url": bookmark.link?.url, + "bookmark.domain": getBookmarkDomain(bookmark.link?.url), + "bookmark.content.type": bookmark.type, + "crawler.statusCode": bookmark.link?.crawlStatusCode ?? undefined, + "inference.tagging.style": tagStyle, + "inference.lang": inferredTagLang, + "inference.type": "tagging", + }); + let response: InferenceResponse | null; if (bookmark.link || bookmark.text) { response = await inferTagsFromText( @@ -325,6 +358,10 @@ async function inferTags( } return tag.trim(); }); + setSpanAttributes({ + "inference.tagging.numGeneratedTags": tags.length, + "inference.totalTokens": response.totalTokens, + }); return tags; } catch (e) { diff --git a/packages/shared-server/src/tracing.ts b/packages/shared-server/src/tracing.ts index e831e019..10222f88 100644 --- a/packages/shared-server/src/tracing.ts +++ b/packages/shared-server/src/tracing.ts @@ -24,6 +24,10 @@ import { import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; +import type { TracingAttributes } from "./tracingTypes"; + +export type { TracingAttributeKey, TracingAttributes } from "./tracingTypes"; + let tracerProvider: NodeTracerProvider | null = null; let isInitialized = false; @@ -129,7 +133,7 @@ export async function withSpan<T>( spanName: string, options: { kind?: SpanKind; - attributes?: Record<string, string | number | boolean>; + attributes?: TracingAttributes; }, fn: (span: Span) => Promise<T>, ): Promise<T> { @@ -168,7 +172,7 @@ export function withSpanSync<T>( spanName: string, options: { kind?: SpanKind; - attributes?: Record<string, string | number | boolean>; + attributes?: TracingAttributes; }, fn: (span: Span) => T, ): T { @@ -213,9 +217,7 @@ export function addSpanEvent( /** * Set attributes on the current active span. */ -export function setSpanAttributes( - attributes: Record<string, string | number | boolean>, -): void { +export function setSpanAttributes(attributes: TracingAttributes): void { const span = getActiveSpan(); if (span) { span.setAttributes(attributes); diff --git a/packages/shared-server/src/tracingTypes.ts b/packages/shared-server/src/tracingTypes.ts new file mode 100644 index 00000000..f397fa6f --- /dev/null +++ b/packages/shared-server/src/tracingTypes.ts @@ -0,0 +1,45 @@ +export type TracingAttributeKey = + // User attributes + | "user.id" + | "user.role" + | "user.tier" + // RPC attributes + | "rpc.system" + | "rpc.method" + | "rpc.type" + // Job attributes + | "job.id" + | "job.priority" + | "job.runNumber" + | "job.groupId" + // Bookmark attributes + | "bookmark.id" + | "bookmark.url" + | "bookmark.domain" + | "bookmark.content.size" + | "bookmark.content.type" + // Asset attributes + | "asset.id" + | "asset.type" + | "asset.size" + // Crawler-specific attributes + | "crawler.forceStorePdf" + | "crawler.archiveFullPage" + | "crawler.hasPrecrawledArchive" + | "crawler.getContentType.statusCode" + | "crawler.contentType" + | "crawler.statusCode" + // Inference-specific attributes + | "inference.tagging.numGeneratedTags" + | "inference.tagging.style" + | "inference.summary.size" + | "inference.lang" + | "inference.prompt.size" + | "inference.prompt.customCount" + | "inference.totalTokens" + | "inference.model" + | "inference.type"; + +export type TracingAttributes = Partial< + Record<TracingAttributeKey, string | number | boolean> +>; |
