aboutsummaryrefslogtreecommitdiffstats
path: root/packages/shared-server/src
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--packages/shared-server/src/index.ts1
-rw-r--r--packages/shared-server/src/queues.ts162
-rw-r--r--packages/shared-server/src/services/quotaService.ts5
-rw-r--r--packages/shared-server/src/tracing.ts274
-rw-r--r--packages/shared-server/src/tracingTypes.ts49
5 files changed, 442 insertions, 49 deletions
diff --git a/packages/shared-server/src/index.ts b/packages/shared-server/src/index.ts
index d42118c2..8e3b2e52 100644
--- a/packages/shared-server/src/index.ts
+++ b/packages/shared-server/src/index.ts
@@ -1,3 +1,4 @@
export { loadAllPlugins } from "./plugins";
export { QuotaService, StorageQuotaError } from "./services/quotaService";
export * from "./queues";
+export * from "./tracing";
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts
index 8ee50df0..4d4a61d6 100644
--- a/packages/shared-server/src/queues.ts
+++ b/packages/shared-server/src/queues.ts
@@ -1,19 +1,80 @@
import { z } from "zod";
-import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing";
+import {
+ EnqueueOptions,
+ getQueueClient,
+ Queue,
+ QueueClient,
+ QueueOptions,
+} from "@karakeep/shared/queueing";
import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules";
import { loadAllPlugins } from ".";
-await loadAllPlugins();
-const QUEUE_CLIENT = await getQueueClient();
+export enum QueuePriority {
+ Low = 50,
+ Default = 0,
+}
+
+// Lazy client initialization - plugins are loaded on first access
+// We cache the promise to ensure only one initialization happens even with concurrent calls
+let clientPromise: Promise<QueueClient> | null = null;
+
+function getClient(): Promise<QueueClient> {
+ if (!clientPromise) {
+ clientPromise = (async () => {
+ await loadAllPlugins();
+ return await getQueueClient();
+ })();
+ }
+ return clientPromise;
+}
+
+/**
+ * Creates a deferred queue that initializes lazily on first use.
+ * This allows the module to be imported without requiring plugins to be loaded.
+ */
+function createDeferredQueue<T>(name: string, options: QueueOptions): Queue<T> {
+ // Cache the promise to ensure only one queue is created even with concurrent calls
+ let queuePromise: Promise<Queue<T>> | null = null;
+
+ const ensureQueue = (): Promise<Queue<T>> => {
+ if (!queuePromise) {
+ queuePromise = (async () => {
+ const client = await getClient();
+ return client.createQueue<T>(name, options);
+ })();
+ }
+ return queuePromise;
+ };
+
+ return {
+ opts: options,
+ name: () => name,
+ ensureInit: async () => {
+ await ensureQueue();
+ },
+ async enqueue(payload: T, opts?: EnqueueOptions) {
+ return (await ensureQueue()).enqueue(payload, opts);
+ },
+ async stats() {
+ return (await ensureQueue()).stats();
+ },
+ async cancelAllNonRunning() {
+ const q = await ensureQueue();
+ return q.cancelAllNonRunning?.() ?? 0;
+ },
+ };
+}
export async function prepareQueue() {
- await QUEUE_CLIENT.prepare();
+ const client = await getClient();
+ await client.prepare();
}
export async function startQueue() {
- await QUEUE_CLIENT.start();
+ const client = await getClient();
+ await client.start();
}
// Link Crawler
@@ -21,10 +82,11 @@ export const zCrawlLinkRequestSchema = z.object({
bookmarkId: z.string(),
runInference: z.boolean().optional(),
archiveFullPage: z.boolean().optional().default(false),
+ storePdf: z.boolean().optional().default(false),
});
export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>;
-export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>(
+export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>(
"link_crawler_queue",
{
defaultJobArgs: {
@@ -34,6 +96,18 @@ export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>(
},
);
+// Separate queue for low priority link crawling (e.g. imports)
+// This prevents low priority crawling from impacting the parallelism of the main queue
+export const LowPriorityCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>(
+ "low_priority_crawler_queue",
+ {
+ defaultJobArgs: {
+ numRetries: 5,
+ },
+ keepFailedJobs: false,
+ },
+);
+
// Inference Worker
export const zOpenAIRequestSchema = z.object({
bookmarkId: z.string(),
@@ -41,15 +115,12 @@ export const zOpenAIRequestSchema = z.object({
});
export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>;
-export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>(
- "openai_queue",
- {
- defaultJobArgs: {
- numRetries: 3,
- },
- keepFailedJobs: false,
+export const OpenAIQueue = createDeferredQueue<ZOpenAIRequest>("openai_queue", {
+ defaultJobArgs: {
+ numRetries: 3,
},
-);
+ keepFailedJobs: false,
+});
// Search Indexing Worker
export const zSearchIndexingRequestSchema = z.object({
@@ -59,13 +130,15 @@ export const zSearchIndexingRequestSchema = z.object({
export type ZSearchIndexingRequest = z.infer<
typeof zSearchIndexingRequestSchema
>;
-export const SearchIndexingQueue =
- QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", {
+export const SearchIndexingQueue = createDeferredQueue<ZSearchIndexingRequest>(
+ "searching_indexing",
+ {
defaultJobArgs: {
numRetries: 5,
},
keepFailedJobs: false,
- });
+ },
+);
// Admin maintenance worker
export const zTidyAssetsRequestSchema = z.object({
@@ -95,13 +168,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract<
{ type: "migrate_large_link_html" }
>;
-export const AdminMaintenanceQueue =
- QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", {
+export const AdminMaintenanceQueue = createDeferredQueue<ZAdminMaintenanceTask>(
+ "admin_maintenance_queue",
+ {
defaultJobArgs: {
numRetries: 1,
},
keepFailedJobs: false,
- });
+ },
+);
export async function triggerSearchReindex(
bookmarkId: string,
@@ -125,7 +200,7 @@ export const zvideoRequestSchema = z.object({
});
export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>;
-export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>(
+export const VideoWorkerQueue = createDeferredQueue<ZVideoRequest>(
"video_queue",
{
defaultJobArgs: {
@@ -141,16 +216,13 @@ export const zFeedRequestSchema = z.object({
});
export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>;
-export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>(
- "feed_queue",
- {
- defaultJobArgs: {
- // One retry is enough for the feed queue given that it's periodic
- numRetries: 1,
- },
- keepFailedJobs: false,
+export const FeedQueue = createDeferredQueue<ZFeedRequestSchema>("feed_queue", {
+ defaultJobArgs: {
+ // One retry is enough for the feed queue given that it's periodic
+ numRetries: 1,
},
-);
+ keepFailedJobs: false,
+});
// Preprocess Assets
export const zAssetPreprocessingRequestSchema = z.object({
@@ -161,15 +233,12 @@ export type AssetPreprocessingRequest = z.infer<
typeof zAssetPreprocessingRequestSchema
>;
export const AssetPreprocessingQueue =
- QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>(
- "asset_preprocessing_queue",
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
+ createDeferredQueue<AssetPreprocessingRequest>("asset_preprocessing_queue", {
+ defaultJobArgs: {
+ numRetries: 2,
},
- );
+ keepFailedJobs: false,
+ });
// Webhook worker
export const zWebhookRequestSchema = z.object({
@@ -178,7 +247,7 @@ export const zWebhookRequestSchema = z.object({
userId: z.string().optional(),
});
export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>;
-export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>(
+export const WebhookQueue = createDeferredQueue<ZWebhookRequest>(
"webhook_queue",
{
defaultJobArgs: {
@@ -210,7 +279,7 @@ export const zRuleEngineRequestSchema = z.object({
events: z.array(zRuleEngineEventSchema),
});
export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>;
-export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>(
+export const RuleEngineQueue = createDeferredQueue<ZRuleEngineRequest>(
"rule_engine_queue",
{
defaultJobArgs: {
@@ -240,12 +309,9 @@ export const zBackupRequestSchema = z.object({
backupId: z.string().optional(),
});
export type ZBackupRequest = z.infer<typeof zBackupRequestSchema>;
-export const BackupQueue = QUEUE_CLIENT.createQueue<ZBackupRequest>(
- "backup_queue",
- {
- defaultJobArgs: {
- numRetries: 2,
- },
- keepFailedJobs: false,
+export const BackupQueue = createDeferredQueue<ZBackupRequest>("backup_queue", {
+ defaultJobArgs: {
+ numRetries: 2,
},
-);
+ keepFailedJobs: false,
+});
diff --git a/packages/shared-server/src/services/quotaService.ts b/packages/shared-server/src/services/quotaService.ts
index a09b76bf..bedda881 100644
--- a/packages/shared-server/src/services/quotaService.ts
+++ b/packages/shared-server/src/services/quotaService.ts
@@ -22,7 +22,10 @@ export class StorageQuotaError extends Error {
export class QuotaService {
// TODO: Use quota approval tokens for bookmark creation when
// bookmark creation logic is in the model.
- static async canCreateBookmark(db: DB, userId: string) {
+ static async canCreateBookmark(
+ db: DB | KarakeepDBTransaction,
+ userId: string,
+ ) {
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: {
diff --git a/packages/shared-server/src/tracing.ts b/packages/shared-server/src/tracing.ts
new file mode 100644
index 00000000..10222f88
--- /dev/null
+++ b/packages/shared-server/src/tracing.ts
@@ -0,0 +1,274 @@
+import type { Context, Span, Tracer } from "@opentelemetry/api";
+import {
+ context,
+ propagation,
+ SpanKind,
+ SpanStatusCode,
+ trace,
+} from "@opentelemetry/api";
+import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
+import { resourceFromAttributes } from "@opentelemetry/resources";
+import {
+ BatchSpanProcessor,
+ ConsoleSpanExporter,
+ ParentBasedSampler,
+ SimpleSpanProcessor,
+ TraceIdRatioBasedSampler,
+} from "@opentelemetry/sdk-trace-base";
+import { NodeTracerProvider } from "@opentelemetry/sdk-trace-node";
+import {
+ ATTR_SERVICE_NAME,
+ ATTR_SERVICE_VERSION,
+} from "@opentelemetry/semantic-conventions";
+
+import serverConfig from "@karakeep/shared/config";
+import logger from "@karakeep/shared/logger";
+
+import type { TracingAttributes } from "./tracingTypes";
+
+export type { TracingAttributeKey, TracingAttributes } from "./tracingTypes";
+
+let tracerProvider: NodeTracerProvider | null = null;
+let isInitialized = false;
+
+/**
+ * Initialize the OpenTelemetry tracing infrastructure.
+ * Should be called once at application startup.
+ */
+export function initTracing(serviceSuffix?: string): void {
+ if (isInitialized) {
+ logger.debug("Tracing already initialized, skipping");
+ return;
+ }
+
+ if (!serverConfig.tracing.enabled) {
+ logger.info("Tracing is disabled");
+ isInitialized = true;
+ return;
+ }
+
+ const serviceName = serviceSuffix
+ ? `${serverConfig.tracing.serviceName}-${serviceSuffix}`
+ : serverConfig.tracing.serviceName;
+
+ logger.info(`Initializing OpenTelemetry tracing for service: ${serviceName}`);
+
+ const resource = resourceFromAttributes({
+ [ATTR_SERVICE_NAME]: serviceName,
+ [ATTR_SERVICE_VERSION]: serverConfig.serverVersion ?? "unknown",
+ });
+
+ // Configure span processors
+ const spanProcessors = [];
+
+ if (serverConfig.tracing.otlpEndpoint) {
+ // OTLP exporter (Jaeger, Zipkin, etc.)
+ const otlpExporter = new OTLPTraceExporter({
+ url: serverConfig.tracing.otlpEndpoint,
+ });
+ spanProcessors.push(new BatchSpanProcessor(otlpExporter));
+ logger.info(
+ `OTLP exporter configured: ${serverConfig.tracing.otlpEndpoint}`,
+ );
+ } else {
+ // Fallback to console exporter for development
+ spanProcessors.push(new SimpleSpanProcessor(new ConsoleSpanExporter()));
+ logger.info("Console span exporter configured (no OTLP endpoint set)");
+ }
+
+ tracerProvider = new NodeTracerProvider({
+ resource,
+ sampler: new ParentBasedSampler({
+ root: new TraceIdRatioBasedSampler(serverConfig.tracing.sampleRate),
+ }),
+ spanProcessors,
+ });
+
+ // Register the provider globally
+ tracerProvider.register();
+
+ isInitialized = true;
+ logger.info("OpenTelemetry tracing initialized successfully");
+}
+
+/**
+ * Shutdown the tracing infrastructure gracefully.
+ * Should be called on application shutdown.
+ */
+export async function shutdownTracing(): Promise<void> {
+ if (tracerProvider) {
+ await tracerProvider.shutdown();
+ logger.info("OpenTelemetry tracing shut down");
+ }
+}
+
+/**
+ * Get a tracer instance for creating spans.
+ * @param name - The name of the tracer (typically the module/component name)
+ */
+export function getTracer(name: string): Tracer {
+ return trace.getTracer(name);
+}
+
+/**
+ * Get the currently active span, if any.
+ */
+export function getActiveSpan(): Span | undefined {
+ return trace.getActiveSpan();
+}
+
+/**
+ * Get the current trace context.
+ */
+export function getActiveContext(): Context {
+ return context.active();
+}
+
+/**
+ * Execute a function within a new span.
+ * Automatically handles error recording and span status.
+ */
+export async function withSpan<T>(
+ tracer: Tracer,
+ spanName: string,
+ options: {
+ kind?: SpanKind;
+ attributes?: TracingAttributes;
+ },
+ fn: (span: Span) => Promise<T>,
+): Promise<T> {
+ return tracer.startActiveSpan(
+ spanName,
+ {
+ kind: options.kind ?? SpanKind.INTERNAL,
+ attributes: options.attributes,
+ },
+ async (span) => {
+ try {
+ const result = await fn(span);
+ span.setStatus({ code: SpanStatusCode.OK });
+ return result;
+ } catch (error) {
+ span.setStatus({
+ code: SpanStatusCode.ERROR,
+ message: error instanceof Error ? error.message : String(error),
+ });
+ span.recordException(
+ error instanceof Error ? error : new Error(String(error)),
+ );
+ throw error;
+ } finally {
+ span.end();
+ }
+ },
+ );
+}
+
+/**
+ * Execute a synchronous function within a new span.
+ */
+export function withSpanSync<T>(
+ tracer: Tracer,
+ spanName: string,
+ options: {
+ kind?: SpanKind;
+ attributes?: TracingAttributes;
+ },
+ fn: (span: Span) => T,
+): T {
+ const span = tracer.startSpan(spanName, {
+ kind: options.kind ?? SpanKind.INTERNAL,
+ attributes: options.attributes,
+ });
+
+ try {
+ const result = context.with(trace.setSpan(context.active(), span), () =>
+ fn(span),
+ );
+ span.setStatus({ code: SpanStatusCode.OK });
+ return result;
+ } catch (error) {
+ span.setStatus({
+ code: SpanStatusCode.ERROR,
+ message: error instanceof Error ? error.message : String(error),
+ });
+ span.recordException(
+ error instanceof Error ? error : new Error(String(error)),
+ );
+ throw error;
+ } finally {
+ span.end();
+ }
+}
+
+/**
+ * Add an event to the current active span.
+ */
+export function addSpanEvent(
+ name: string,
+ attributes?: Record<string, string | number | boolean>,
+): void {
+ const span = getActiveSpan();
+ if (span) {
+ span.addEvent(name, attributes);
+ }
+}
+
+/**
+ * Set attributes on the current active span.
+ */
+export function setSpanAttributes(attributes: TracingAttributes): void {
+ const span = getActiveSpan();
+ if (span) {
+ span.setAttributes(attributes);
+ }
+}
+
+/**
+ * Record an error on the current active span.
+ */
+export function recordSpanError(error: Error): void {
+ const span = getActiveSpan();
+ if (span) {
+ span.recordException(error);
+ span.setStatus({
+ code: SpanStatusCode.ERROR,
+ message: error.message,
+ });
+ }
+}
+
+/**
+ * Extract trace context from HTTP headers (for distributed tracing).
+ */
+export function extractTraceContext(
+ headers: Record<string, string | string[] | undefined>,
+): Context {
+ const normalizedHeaders: Record<string, string> = {};
+ for (const [key, value] of Object.entries(headers)) {
+ if (value) {
+ normalizedHeaders[key] = Array.isArray(value) ? value[0] : value;
+ }
+ }
+ return propagation.extract(context.active(), normalizedHeaders);
+}
+
+/**
+ * Inject trace context into HTTP headers (for distributed tracing).
+ */
+export function injectTraceContext(
+ headers: Record<string, string>,
+): Record<string, string> {
+ propagation.inject(context.active(), headers);
+ return headers;
+}
+
+/**
+ * Run a function within a specific context.
+ */
+export function runWithContext<T>(ctx: Context, fn: () => T): T {
+ return context.with(ctx, fn);
+}
+
+// Re-export commonly used types and constants
+export { SpanKind, SpanStatusCode } from "@opentelemetry/api";
diff --git a/packages/shared-server/src/tracingTypes.ts b/packages/shared-server/src/tracingTypes.ts
new file mode 100644
index 00000000..f0926b97
--- /dev/null
+++ b/packages/shared-server/src/tracingTypes.ts
@@ -0,0 +1,49 @@
+export type TracingAttributeKey =
+ // User attributes
+ | "user.id"
+ | "user.role"
+ | "user.tier"
+ // RPC attributes
+ | "rpc.system"
+ | "rpc.method"
+ | "rpc.type"
+ // Job attributes
+ | "job.id"
+ | "job.priority"
+ | "job.runNumber"
+ | "job.groupId"
+ // Bookmark attributes
+ | "bookmark.id"
+ | "bookmark.url"
+ | "bookmark.domain"
+ | "bookmark.content.size"
+ | "bookmark.content.type"
+ // Asset attributes
+ | "asset.id"
+ | "asset.type"
+ | "asset.size"
+ // Crawler-specific attributes
+ | "crawler.forceStorePdf"
+ | "crawler.archiveFullPage"
+ | "crawler.hasPrecrawledArchive"
+ | "crawler.getContentType.statusCode"
+ | "crawler.contentType"
+ | "crawler.statusCode"
+ // Database attributes
+ | "db.system"
+ | "db.statement"
+ | "db.operation"
+ // Inference-specific attributes
+ | "inference.tagging.numGeneratedTags"
+ | "inference.tagging.style"
+ | "inference.summary.size"
+ | "inference.lang"
+ | "inference.prompt.size"
+ | "inference.prompt.customCount"
+ | "inference.totalTokens"
+ | "inference.model"
+ | "inference.type";
+
+export type TracingAttributes = Partial<
+ Record<TracingAttributeKey, string | number | boolean>
+>;