diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-01-25 22:27:05 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2026-02-01 10:44:54 +0000 |
| commit | 9decab898983bc132835d4c517fc02aa695cb4af (patch) | |
| tree | ad7092f1b123412fcbf89f85298f830285bd8a05 | |
| parent | 95bfa5691ce0d43fbaa9c011df4ec9561635ff8d (diff) | |
| download | karakeep-9decab898983bc132835d4c517fc02aa695cb4af.tar.zst | |
refactor: lazy init background queues
| -rw-r--r-- | apps/workers/index.ts | 60 | ||||
| -rw-r--r-- | packages/plugins/queue-liteque/src/index.ts | 4 | ||||
| -rw-r--r-- | packages/plugins/queue-restate/src/index.ts | 4 | ||||
| -rw-r--r-- | packages/shared-server/src/queues.ts | 144 | ||||
| -rw-r--r-- | packages/shared/queueing.ts | 1 |
5 files changed, 155 insertions, 58 deletions
diff --git a/apps/workers/index.ts b/apps/workers/index.ts index 07840a4c..dfbac85b 100644 --- a/apps/workers/index.ts +++ b/apps/workers/index.ts @@ -3,11 +3,21 @@ import "dotenv/config"; import { buildServer } from "server"; import { + AdminMaintenanceQueue, + AssetPreprocessingQueue, + BackupQueue, + FeedQueue, initTracing, + LinkCrawlerQueue, loadAllPlugins, + OpenAIQueue, prepareQueue, + RuleEngineQueue, + SearchIndexingQueue, shutdownTracing, startQueue, + VideoWorkerQueue, + WebhookQueue, } from "@karakeep/shared-server"; import serverConfig from "@karakeep/shared/config"; import logger from "@karakeep/shared/logger"; @@ -25,16 +35,46 @@ import { VideoWorker } from "./workers/videoWorker"; import { WebhookWorker } from "./workers/webhookWorker"; const workerBuilders = { - crawler: () => CrawlerWorker.build(), - inference: () => OpenAiWorker.build(), - search: () => SearchIndexingWorker.build(), - adminMaintenance: () => AdminMaintenanceWorker.build(), - video: () => VideoWorker.build(), - feed: () => FeedWorker.build(), - assetPreprocessing: () => AssetPreprocessingWorker.build(), - webhook: () => WebhookWorker.build(), - ruleEngine: () => RuleEngineWorker.build(), - backup: () => BackupWorker.build(), + crawler: async () => { + await LinkCrawlerQueue.ensureInit(); + return CrawlerWorker.build(); + }, + inference: async () => { + await OpenAIQueue.ensureInit(); + return OpenAiWorker.build(); + }, + search: async () => { + await SearchIndexingQueue.ensureInit(); + return SearchIndexingWorker.build(); + }, + adminMaintenance: async () => { + await AdminMaintenanceQueue.ensureInit(); + return AdminMaintenanceWorker.build(); + }, + video: async () => { + await VideoWorkerQueue.ensureInit(); + return VideoWorker.build(); + }, + feed: async () => { + await FeedQueue.ensureInit(); + return FeedWorker.build(); + }, + assetPreprocessing: async () => { + await AssetPreprocessingQueue.ensureInit(); + return AssetPreprocessingWorker.build(); + }, + webhook: async () => { + await WebhookQueue.ensureInit(); + return WebhookWorker.build(); + }, + ruleEngine: async () => { + await RuleEngineQueue.ensureInit(); + return RuleEngineWorker.build(); + }, + backup: async () => { + await BackupQueue.ensureInit(); + return BackupWorker.build(); + }, } as const; type WorkerName = keyof typeof workerBuilders; diff --git a/packages/plugins/queue-liteque/src/index.ts b/packages/plugins/queue-liteque/src/index.ts index b809d158..48c4ed59 100644 --- a/packages/plugins/queue-liteque/src/index.ts +++ b/packages/plugins/queue-liteque/src/index.ts @@ -28,6 +28,10 @@ class LitequeQueueWrapper<T> implements Queue<T> { public readonly opts: QueueOptions, ) {} + ensureInit(): Promise<void> { + return Promise.resolve(); + } + name(): string { return this._name; } diff --git a/packages/plugins/queue-restate/src/index.ts b/packages/plugins/queue-restate/src/index.ts index 64572b03..f8761291 100644 --- a/packages/plugins/queue-restate/src/index.ts +++ b/packages/plugins/queue-restate/src/index.ts @@ -27,6 +27,10 @@ class RestateQueueWrapper<T> implements Queue<T> { public readonly opts: QueueOptions, ) {} + ensureInit(): Promise<void> { + return Promise.resolve(); + } + name(): string { return this._name; } diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index 140d9c0b..38c7bfe1 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -1,19 +1,75 @@ import { z } from "zod"; -import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { + EnqueueOptions, + getQueueClient, + Queue, + QueueClient, + QueueOptions, +} from "@karakeep/shared/queueing"; import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; import { loadAllPlugins } from "."; -await loadAllPlugins(); -const QUEUE_CLIENT = await getQueueClient(); +// Lazy client initialization - plugins are loaded on first access +// We cache the promise to ensure only one initialization happens even with concurrent calls +let clientPromise: Promise<QueueClient> | null = null; + +function getClient(): Promise<QueueClient> { + if (!clientPromise) { + clientPromise = (async () => { + await loadAllPlugins(); + return await getQueueClient(); + })(); + } + return clientPromise; +} + +/** + * Creates a deferred queue that initializes lazily on first use. + * This allows the module to be imported without requiring plugins to be loaded. + */ +function createDeferredQueue<T>(name: string, options: QueueOptions): Queue<T> { + // Cache the promise to ensure only one queue is created even with concurrent calls + let queuePromise: Promise<Queue<T>> | null = null; + + const ensureQueue = (): Promise<Queue<T>> => { + if (!queuePromise) { + queuePromise = (async () => { + const client = await getClient(); + return client.createQueue<T>(name, options); + })(); + } + return queuePromise; + }; + + return { + opts: options, + name: () => name, + ensureInit: async () => { + await ensureQueue(); + }, + async enqueue(payload: T, opts?: EnqueueOptions) { + return (await ensureQueue()).enqueue(payload, opts); + }, + async stats() { + return (await ensureQueue()).stats(); + }, + async cancelAllNonRunning() { + const q = await ensureQueue(); + return q.cancelAllNonRunning?.() ?? 0; + }, + }; +} export async function prepareQueue() { - await QUEUE_CLIENT.prepare(); + const client = await getClient(); + await client.prepare(); } export async function startQueue() { - await QUEUE_CLIENT.start(); + const client = await getClient(); + await client.start(); } // Link Crawler @@ -25,7 +81,7 @@ export const zCrawlLinkRequestSchema = z.object({ }); export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>; -export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>( +export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>( "link_crawler_queue", { defaultJobArgs: { @@ -42,15 +98,12 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; -export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>( - "openai_queue", - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, +export const OpenAIQueue = createDeferredQueue<ZOpenAIRequest>("openai_queue", { + defaultJobArgs: { + numRetries: 3, }, -); + keepFailedJobs: false, +}); // Search Indexing Worker export const zSearchIndexingRequestSchema = z.object({ @@ -60,13 +113,15 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = - QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", { +export const SearchIndexingQueue = createDeferredQueue<ZSearchIndexingRequest>( + "searching_indexing", + { defaultJobArgs: { numRetries: 5, }, keepFailedJobs: false, - }); + }, +); // Admin maintenance worker export const zTidyAssetsRequestSchema = z.object({ @@ -96,13 +151,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract< { type: "migrate_large_link_html" } >; -export const AdminMaintenanceQueue = - QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", { +export const AdminMaintenanceQueue = createDeferredQueue<ZAdminMaintenanceTask>( + "admin_maintenance_queue", + { defaultJobArgs: { numRetries: 1, }, keepFailedJobs: false, - }); + }, +); export async function triggerSearchReindex( bookmarkId: string, @@ -126,7 +183,7 @@ export const zvideoRequestSchema = z.object({ }); export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>; -export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>( +export const VideoWorkerQueue = createDeferredQueue<ZVideoRequest>( "video_queue", { defaultJobArgs: { @@ -142,16 +199,13 @@ export const zFeedRequestSchema = z.object({ }); export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>; -export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>( - "feed_queue", - { - defaultJobArgs: { - // One retry is enough for the feed queue given that it's periodic - numRetries: 1, - }, - keepFailedJobs: false, +export const FeedQueue = createDeferredQueue<ZFeedRequestSchema>("feed_queue", { + defaultJobArgs: { + // One retry is enough for the feed queue given that it's periodic + numRetries: 1, }, -); + keepFailedJobs: false, +}); // Preprocess Assets export const zAssetPreprocessingRequestSchema = z.object({ @@ -162,15 +216,12 @@ export type AssetPreprocessingRequest = z.infer< typeof zAssetPreprocessingRequestSchema >; export const AssetPreprocessingQueue = - QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>( - "asset_preprocessing_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, + createDeferredQueue<AssetPreprocessingRequest>("asset_preprocessing_queue", { + defaultJobArgs: { + numRetries: 2, }, - ); + keepFailedJobs: false, + }); // Webhook worker export const zWebhookRequestSchema = z.object({ @@ -179,7 +230,7 @@ export const zWebhookRequestSchema = z.object({ userId: z.string().optional(), }); export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>; -export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>( +export const WebhookQueue = createDeferredQueue<ZWebhookRequest>( "webhook_queue", { defaultJobArgs: { @@ -211,7 +262,7 @@ export const zRuleEngineRequestSchema = z.object({ events: z.array(zRuleEngineEventSchema), }); export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>; -export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>( +export const RuleEngineQueue = createDeferredQueue<ZRuleEngineRequest>( "rule_engine_queue", { defaultJobArgs: { @@ -241,12 +292,9 @@ export const zBackupRequestSchema = z.object({ backupId: z.string().optional(), }); export type ZBackupRequest = z.infer<typeof zBackupRequestSchema>; -export const BackupQueue = QUEUE_CLIENT.createQueue<ZBackupRequest>( - "backup_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, +export const BackupQueue = createDeferredQueue<ZBackupRequest>("backup_queue", { + defaultJobArgs: { + numRetries: 2, }, -); + keepFailedJobs: false, +}); diff --git a/packages/shared/queueing.ts b/packages/shared/queueing.ts index bc2c9cfa..d1f4bcef 100644 --- a/packages/shared/queueing.ts +++ b/packages/shared/queueing.ts @@ -63,6 +63,7 @@ export interface RunnerOptions<T> { export interface Queue<T> { opts: QueueOptions; + ensureInit(): Promise<void>; name(): string; enqueue(payload: T, options?: EnqueueOptions): Promise<string | undefined>; stats(): Promise<{ |
