diff options
Diffstat (limited to '')
| -rw-r--r-- | packages/shared-server/src/index.ts | 1 | ||||
| -rw-r--r-- | packages/shared-server/src/queues.ts | 162 | ||||
| -rw-r--r-- | packages/shared-server/src/services/quotaService.ts | 5 | ||||
| -rw-r--r-- | packages/shared-server/src/tracing.ts | 274 | ||||
| -rw-r--r-- | packages/shared-server/src/tracingTypes.ts | 49 |
5 files changed, 442 insertions, 49 deletions
diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts index d42118c2..8e3b2e52 100644 --- a/packages/shared-server/src/index.ts +++ b/packages/shared-server/src/index.ts @@ -1,3 +1,4 @@ export { loadAllPlugins } from "./plugins"; export { QuotaService, StorageQuotaError } from "./services/quotaService"; export * from "./queues"; +export * from "./tracing"; diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index 8ee50df0..4d4a61d6 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -1,19 +1,80 @@ import { z } from "zod"; -import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { + EnqueueOptions, + getQueueClient, + Queue, + QueueClient, + QueueOptions, +} from "@karakeep/shared/queueing"; import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; import { loadAllPlugins } from "."; -await loadAllPlugins(); -const QUEUE_CLIENT = await getQueueClient(); +export enum QueuePriority { + Low = 50, + Default = 0, +} + +// Lazy client initialization - plugins are loaded on first access +// We cache the promise to ensure only one initialization happens even with concurrent calls +let clientPromise: Promise<QueueClient> | null = null; + +function getClient(): Promise<QueueClient> { + if (!clientPromise) { + clientPromise = (async () => { + await loadAllPlugins(); + return await getQueueClient(); + })(); + } + return clientPromise; +} + +/** + * Creates a deferred queue that initializes lazily on first use. + * This allows the module to be imported without requiring plugins to be loaded. + */ +function createDeferredQueue<T>(name: string, options: QueueOptions): Queue<T> { + // Cache the promise to ensure only one queue is created even with concurrent calls + let queuePromise: Promise<Queue<T>> | null = null; + + const ensureQueue = (): Promise<Queue<T>> => { + if (!queuePromise) { + queuePromise = (async () => { + const client = await getClient(); + return client.createQueue<T>(name, options); + })(); + } + return queuePromise; + }; + + return { + opts: options, + name: () => name, + ensureInit: async () => { + await ensureQueue(); + }, + async enqueue(payload: T, opts?: EnqueueOptions) { + return (await ensureQueue()).enqueue(payload, opts); + }, + async stats() { + return (await ensureQueue()).stats(); + }, + async cancelAllNonRunning() { + const q = await ensureQueue(); + return q.cancelAllNonRunning?.() ?? 0; + }, + }; +} export async function prepareQueue() { - await QUEUE_CLIENT.prepare(); + const client = await getClient(); + await client.prepare(); } export async function startQueue() { - await QUEUE_CLIENT.start(); + const client = await getClient(); + await client.start(); } // Link Crawler @@ -21,10 +82,11 @@ export const zCrawlLinkRequestSchema = z.object({ bookmarkId: z.string(), runInference: z.boolean().optional(), archiveFullPage: z.boolean().optional().default(false), + storePdf: z.boolean().optional().default(false), }); export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>; -export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>( +export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>( "link_crawler_queue", { defaultJobArgs: { @@ -34,6 +96,18 @@ export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>( }, ); +// Separate queue for low priority link crawling (e.g. imports) +// This prevents low priority crawling from impacting the parallelism of the main queue +export const LowPriorityCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>( + "low_priority_crawler_queue", + { + defaultJobArgs: { + numRetries: 5, + }, + keepFailedJobs: false, + }, +); + // Inference Worker export const zOpenAIRequestSchema = z.object({ bookmarkId: z.string(), @@ -41,15 +115,12 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; -export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>( - "openai_queue", - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, +export const OpenAIQueue = createDeferredQueue<ZOpenAIRequest>("openai_queue", { + defaultJobArgs: { + numRetries: 3, }, -); + keepFailedJobs: false, +}); // Search Indexing Worker export const zSearchIndexingRequestSchema = z.object({ @@ -59,13 +130,15 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = - QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", { +export const SearchIndexingQueue = createDeferredQueue<ZSearchIndexingRequest>( + "searching_indexing", + { defaultJobArgs: { numRetries: 5, }, keepFailedJobs: false, - }); + }, +); // Admin maintenance worker export const zTidyAssetsRequestSchema = z.object({ @@ -95,13 +168,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract< { type: "migrate_large_link_html" } >; -export const AdminMaintenanceQueue = - QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", { +export const AdminMaintenanceQueue = createDeferredQueue<ZAdminMaintenanceTask>( + "admin_maintenance_queue", + { defaultJobArgs: { numRetries: 1, }, keepFailedJobs: false, - }); + }, +); export async function triggerSearchReindex( bookmarkId: string, @@ -125,7 +200,7 @@ export const zvideoRequestSchema = z.object({ }); export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>; -export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>( +export const VideoWorkerQueue = createDeferredQueue<ZVideoRequest>( "video_queue", { defaultJobArgs: { @@ -141,16 +216,13 @@ export const zFeedRequestSchema = z.object({ }); export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>; -export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>( - "feed_queue", - { - defaultJobArgs: { - // One retry is enough for the feed queue given that it's periodic - numRetries: 1, - }, - keepFailedJobs: false, +export const FeedQueue = createDeferredQueue<ZFeedRequestSchema>("feed_queue", { + defaultJobArgs: { + // One retry is enough for the feed queue given that it's periodic + numRetries: 1, }, -); + keepFailedJobs: false, +}); // Preprocess Assets export const zAssetPreprocessingRequestSchema = z.object({ @@ -161,15 +233,12 @@ export type AssetPreprocessingRequest = z.infer< typeof zAssetPreprocessingRequestSchema >; export const AssetPreprocessingQueue = - QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>( - "asset_preprocessing_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, + createDeferredQueue<AssetPreprocessingRequest>("asset_preprocessing_queue", { + defaultJobArgs: { + numRetries: 2, }, - ); + keepFailedJobs: false, + }); // Webhook worker export const zWebhookRequestSchema = z.object({ @@ -178,7 +247,7 @@ export const zWebhookRequestSchema = z.object({ userId: z.string().optional(), }); export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>; -export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>( +export const WebhookQueue = createDeferredQueue<ZWebhookRequest>( "webhook_queue", { defaultJobArgs: { @@ -210,7 +279,7 @@ export const zRuleEngineRequestSchema = z.object({ events: z.array(zRuleEngineEventSchema), }); export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>; -export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>( +export const RuleEngineQueue = createDeferredQueue<ZRuleEngineRequest>( "rule_engine_queue", { defaultJobArgs: { @@ -240,12 +309,9 @@ export const zBackupRequestSchema = z.object({ backupId: z.string().optional(), }); export type ZBackupRequest = z.infer<typeof zBackupRequestSchema>; -export const BackupQueue = QUEUE_CLIENT.createQueue<ZBackupRequest>( - "backup_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, +export const BackupQueue = createDeferredQueue<ZBackupRequest>("backup_queue", { + defaultJobArgs: { + numRetries: 2, }, -); + keepFailedJobs: false, +}); diff --git a/packages/shared-server/src/services/quotaService.ts b/packages/shared-server/src/services/quotaService.ts index a09b76bf..bedda881 100644 --- a/packages/shared-server/src/services/quotaService.ts +++ b/packages/shared-server/src/services/quotaService.ts @@ -22,7 +22,10 @@ export class StorageQuotaError extends Error { export class QuotaService { // TODO: Use quota approval tokens for bookmark creation when // bookmark creation logic is in the model. - static async canCreateBookmark(db: DB, userId: string) { + static async canCreateBookmark( + db: DB | KarakeepDBTransaction, + userId: string, + ) { const user = await db.query.users.findFirst({ where: eq(users.id, userId), columns: { diff --git a/packages/shared-server/src/tracing.ts b/packages/shared-server/src/tracing.ts new file mode 100644 index 00000000..10222f88 --- /dev/null +++ b/packages/shared-server/src/tracing.ts @@ -0,0 +1,274 @@ +import type { Context, Span, Tracer } from "@opentelemetry/api"; +import { + context, + propagation, + SpanKind, + SpanStatusCode, + trace, +} from "@opentelemetry/api"; +import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http"; +import { resourceFromAttributes } from "@opentelemetry/resources"; +import { + BatchSpanProcessor, + ConsoleSpanExporter, + ParentBasedSampler, + SimpleSpanProcessor, + TraceIdRatioBasedSampler, +} from "@opentelemetry/sdk-trace-base"; +import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node"; +import { + ATTR_SERVICE_NAME, + ATTR_SERVICE_VERSION, +} from "@opentelemetry/semantic-conventions"; + +import serverConfig from "@karakeep/shared/config"; +import logger from "@karakeep/shared/logger"; + +import type { TracingAttributes } from "./tracingTypes"; + +export type { TracingAttributeKey, TracingAttributes } from "./tracingTypes"; + +let tracerProvider: NodeTracerProvider | null = null; +let isInitialized = false; + +/** + * Initialize the OpenTelemetry tracing infrastructure. + * Should be called once at application startup. + */ +export function initTracing(serviceSuffix?: string): void { + if (isInitialized) { + logger.debug("Tracing already initialized, skipping"); + return; + } + + if (!serverConfig.tracing.enabled) { + logger.info("Tracing is disabled"); + isInitialized = true; + return; + } + + const serviceName = serviceSuffix + ? `${serverConfig.tracing.serviceName}-${serviceSuffix}` + : serverConfig.tracing.serviceName; + + logger.info(`Initializing OpenTelemetry tracing for service: ${serviceName}`); + + const resource = resourceFromAttributes({ + [ATTR_SERVICE_NAME]: serviceName, + [ATTR_SERVICE_VERSION]: serverConfig.serverVersion ?? "unknown", + }); + + // Configure span processors + const spanProcessors = []; + + if (serverConfig.tracing.otlpEndpoint) { + // OTLP exporter (Jaeger, Zipkin, etc.) + const otlpExporter = new OTLPTraceExporter({ + url: serverConfig.tracing.otlpEndpoint, + }); + spanProcessors.push(new BatchSpanProcessor(otlpExporter)); + logger.info( + `OTLP exporter configured: ${serverConfig.tracing.otlpEndpoint}`, + ); + } else { + // Fallback to console exporter for development + spanProcessors.push(new SimpleSpanProcessor(new ConsoleSpanExporter())); + logger.info("Console span exporter configured (no OTLP endpoint set)"); + } + + tracerProvider = new NodeTracerProvider({ + resource, + sampler: new ParentBasedSampler({ + root: new TraceIdRatioBasedSampler(serverConfig.tracing.sampleRate), + }), + spanProcessors, + }); + + // Register the provider globally + tracerProvider.register(); + + isInitialized = true; + logger.info("OpenTelemetry tracing initialized successfully"); +} + +/** + * Shutdown the tracing infrastructure gracefully. + * Should be called on application shutdown. + */ +export async function shutdownTracing(): Promise<void> { + if (tracerProvider) { + await tracerProvider.shutdown(); + logger.info("OpenTelemetry tracing shut down"); + } +} + +/** + * Get a tracer instance for creating spans. + * @param name - The name of the tracer (typically the module/component name) + */ +export function getTracer(name: string): Tracer { + return trace.getTracer(name); +} + +/** + * Get the currently active span, if any. + */ +export function getActiveSpan(): Span | undefined { + return trace.getActiveSpan(); +} + +/** + * Get the current trace context. + */ +export function getActiveContext(): Context { + return context.active(); +} + +/** + * Execute a function within a new span. + * Automatically handles error recording and span status. + */ +export async function withSpan<T>( + tracer: Tracer, + spanName: string, + options: { + kind?: SpanKind; + attributes?: TracingAttributes; + }, + fn: (span: Span) => Promise<T>, +): Promise<T> { + return tracer.startActiveSpan( + spanName, + { + kind: options.kind ?? SpanKind.INTERNAL, + attributes: options.attributes, + }, + async (span) => { + try { + const result = await fn(span); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + span.recordException( + error instanceof Error ? error : new Error(String(error)), + ); + throw error; + } finally { + span.end(); + } + }, + ); +} + +/** + * Execute a synchronous function within a new span. + */ +export function withSpanSync<T>( + tracer: Tracer, + spanName: string, + options: { + kind?: SpanKind; + attributes?: TracingAttributes; + }, + fn: (span: Span) => T, +): T { + const span = tracer.startSpan(spanName, { + kind: options.kind ?? SpanKind.INTERNAL, + attributes: options.attributes, + }); + + try { + const result = context.with(trace.setSpan(context.active(), span), () => + fn(span), + ); + span.setStatus({ code: SpanStatusCode.OK }); + return result; + } catch (error) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error instanceof Error ? error.message : String(error), + }); + span.recordException( + error instanceof Error ? error : new Error(String(error)), + ); + throw error; + } finally { + span.end(); + } +} + +/** + * Add an event to the current active span. + */ +export function addSpanEvent( + name: string, + attributes?: Record<string, string | number | boolean>, +): void { + const span = getActiveSpan(); + if (span) { + span.addEvent(name, attributes); + } +} + +/** + * Set attributes on the current active span. + */ +export function setSpanAttributes(attributes: TracingAttributes): void { + const span = getActiveSpan(); + if (span) { + span.setAttributes(attributes); + } +} + +/** + * Record an error on the current active span. + */ +export function recordSpanError(error: Error): void { + const span = getActiveSpan(); + if (span) { + span.recordException(error); + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message, + }); + } +} + +/** + * Extract trace context from HTTP headers (for distributed tracing). + */ +export function extractTraceContext( + headers: Record<string, string | string[] | undefined>, +): Context { + const normalizedHeaders: Record<string, string> = {}; + for (const [key, value] of Object.entries(headers)) { + if (value) { + normalizedHeaders[key] = Array.isArray(value) ? value[0] : value; + } + } + return propagation.extract(context.active(), normalizedHeaders); +} + +/** + * Inject trace context into HTTP headers (for distributed tracing). + */ +export function injectTraceContext( + headers: Record<string, string>, +): Record<string, string> { + propagation.inject(context.active(), headers); + return headers; +} + +/** + * Run a function within a specific context. + */ +export function runWithContext<T>(ctx: Context, fn: () => T): T { + return context.with(ctx, fn); +} + +// Re-export commonly used types and constants +export { SpanKind, SpanStatusCode } from "@opentelemetry/api"; diff --git a/packages/shared-server/src/tracingTypes.ts b/packages/shared-server/src/tracingTypes.ts new file mode 100644 index 00000000..f0926b97 --- /dev/null +++ b/packages/shared-server/src/tracingTypes.ts @@ -0,0 +1,49 @@ +export type TracingAttributeKey = + // User attributes + | "user.id" + | "user.role" + | "user.tier" + // RPC attributes + | "rpc.system" + | "rpc.method" + | "rpc.type" + // Job attributes + | "job.id" + | "job.priority" + | "job.runNumber" + | "job.groupId" + // Bookmark attributes + | "bookmark.id" + | "bookmark.url" + | "bookmark.domain" + | "bookmark.content.size" + | "bookmark.content.type" + // Asset attributes + | "asset.id" + | "asset.type" + | "asset.size" + // Crawler-specific attributes + | "crawler.forceStorePdf" + | "crawler.archiveFullPage" + | "crawler.hasPrecrawledArchive" + | "crawler.getContentType.statusCode" + | "crawler.contentType" + | "crawler.statusCode" + // Database attributes + | "db.system" + | "db.statement" + | "db.operation" + // Inference-specific attributes + | "inference.tagging.numGeneratedTags" + | "inference.tagging.style" + | "inference.summary.size" + | "inference.lang" + | "inference.prompt.size" + | "inference.prompt.customCount" + | "inference.totalTokens" + | "inference.model" + | "inference.type"; + +export type TracingAttributes = Partial< + Record<TracingAttributeKey, string | number | boolean> +>; |
