From 9decab898983bc132835d4c517fc02aa695cb4af Mon Sep 17 00:00:00 2001 From: Mohamed Bassem Date: Sun, 25 Jan 2026 22:27:05 +0000 Subject: refactor: lazy init background queues --- packages/shared-server/src/queues.ts | 144 +++++++++++++++++++++++------------ 1 file changed, 96 insertions(+), 48 deletions(-) (limited to 'packages/shared-server/src/queues.ts') diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index 140d9c0b..38c7bfe1 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -1,19 +1,75 @@ import { z } from "zod"; -import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { + EnqueueOptions, + getQueueClient, + Queue, + QueueClient, + QueueOptions, +} from "@karakeep/shared/queueing"; import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; import { loadAllPlugins } from "."; -await loadAllPlugins(); -const QUEUE_CLIENT = await getQueueClient(); +// Lazy client initialization - plugins are loaded on first access +// We cache the promise to ensure only one initialization happens even with concurrent calls +let clientPromise: Promise | null = null; + +function getClient(): Promise { + if (!clientPromise) { + clientPromise = (async () => { + await loadAllPlugins(); + return await getQueueClient(); + })(); + } + return clientPromise; +} + +/** + * Creates a deferred queue that initializes lazily on first use. + * This allows the module to be imported without requiring plugins to be loaded. + */ +function createDeferredQueue(name: string, options: QueueOptions): Queue { + // Cache the promise to ensure only one queue is created even with concurrent calls + let queuePromise: Promise> | null = null; + + const ensureQueue = (): Promise> => { + if (!queuePromise) { + queuePromise = (async () => { + const client = await getClient(); + return client.createQueue(name, options); + })(); + } + return queuePromise; + }; + + return { + opts: options, + name: () => name, + ensureInit: async () => { + await ensureQueue(); + }, + async enqueue(payload: T, opts?: EnqueueOptions) { + return (await ensureQueue()).enqueue(payload, opts); + }, + async stats() { + return (await ensureQueue()).stats(); + }, + async cancelAllNonRunning() { + const q = await ensureQueue(); + return q.cancelAllNonRunning?.() ?? 0; + }, + }; +} export async function prepareQueue() { - await QUEUE_CLIENT.prepare(); + const client = await getClient(); + await client.prepare(); } export async function startQueue() { - await QUEUE_CLIENT.start(); + const client = await getClient(); + await client.start(); } // Link Crawler @@ -25,7 +81,7 @@ export const zCrawlLinkRequestSchema = z.object({ }); export type ZCrawlLinkRequest = z.input; -export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue( +export const LinkCrawlerQueue = createDeferredQueue( "link_crawler_queue", { defaultJobArgs: { @@ -42,15 +98,12 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer; -export const OpenAIQueue = QUEUE_CLIENT.createQueue( - "openai_queue", - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, +export const OpenAIQueue = createDeferredQueue("openai_queue", { + defaultJobArgs: { + numRetries: 3, }, -); + keepFailedJobs: false, +}); // Search Indexing Worker export const zSearchIndexingRequestSchema = z.object({ @@ -60,13 +113,15 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = - QUEUE_CLIENT.createQueue("searching_indexing", { +export const SearchIndexingQueue = createDeferredQueue( + "searching_indexing", + { defaultJobArgs: { numRetries: 5, }, keepFailedJobs: false, - }); + }, +); // Admin maintenance worker export const zTidyAssetsRequestSchema = z.object({ @@ -96,13 +151,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract< { type: "migrate_large_link_html" } >; -export const AdminMaintenanceQueue = - QUEUE_CLIENT.createQueue("admin_maintenance_queue", { +export const AdminMaintenanceQueue = createDeferredQueue( + "admin_maintenance_queue", + { defaultJobArgs: { numRetries: 1, }, keepFailedJobs: false, - }); + }, +); export async function triggerSearchReindex( bookmarkId: string, @@ -126,7 +183,7 @@ export const zvideoRequestSchema = z.object({ }); export type ZVideoRequest = z.infer; -export const VideoWorkerQueue = QUEUE_CLIENT.createQueue( +export const VideoWorkerQueue = createDeferredQueue( "video_queue", { defaultJobArgs: { @@ -142,16 +199,13 @@ export const zFeedRequestSchema = z.object({ }); export type ZFeedRequestSchema = z.infer; -export const FeedQueue = QUEUE_CLIENT.createQueue( - "feed_queue", - { - defaultJobArgs: { - // One retry is enough for the feed queue given that it's periodic - numRetries: 1, - }, - keepFailedJobs: false, +export const FeedQueue = createDeferredQueue("feed_queue", { + defaultJobArgs: { + // One retry is enough for the feed queue given that it's periodic + numRetries: 1, }, -); + keepFailedJobs: false, +}); // Preprocess Assets export const zAssetPreprocessingRequestSchema = z.object({ @@ -162,15 +216,12 @@ export type AssetPreprocessingRequest = z.infer< typeof zAssetPreprocessingRequestSchema >; export const AssetPreprocessingQueue = - QUEUE_CLIENT.createQueue( - "asset_preprocessing_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, + createDeferredQueue("asset_preprocessing_queue", { + defaultJobArgs: { + numRetries: 2, }, - ); + keepFailedJobs: false, + }); // Webhook worker export const zWebhookRequestSchema = z.object({ @@ -179,7 +230,7 @@ export const zWebhookRequestSchema = z.object({ userId: z.string().optional(), }); export type ZWebhookRequest = z.infer; -export const WebhookQueue = QUEUE_CLIENT.createQueue( +export const WebhookQueue = createDeferredQueue( "webhook_queue", { defaultJobArgs: { @@ -211,7 +262,7 @@ export const zRuleEngineRequestSchema = z.object({ events: z.array(zRuleEngineEventSchema), }); export type ZRuleEngineRequest = z.infer; -export const RuleEngineQueue = QUEUE_CLIENT.createQueue( +export const RuleEngineQueue = createDeferredQueue( "rule_engine_queue", { defaultJobArgs: { @@ -241,12 +292,9 @@ export const zBackupRequestSchema = z.object({ backupId: z.string().optional(), }); export type ZBackupRequest = z.infer; -export const BackupQueue = QUEUE_CLIENT.createQueue( - "backup_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, +export const BackupQueue = createDeferredQueue("backup_queue", { + defaultJobArgs: { + numRetries: 2, }, -); + keepFailedJobs: false, +}); -- cgit v1.2.3-70-g09d2