diff options
| author | Mohamed Bassem <me@mbassem.com> | 2026-01-25 22:27:05 +0000 |
|---|---|---|
| committer | Mohamed Bassem <me@mbassem.com> | 2026-02-01 10:44:54 +0000 |
| commit | 9decab898983bc132835d4c517fc02aa695cb4af (patch) | |
| tree | ad7092f1b123412fcbf89f85298f830285bd8a05 /packages/shared-server/src | |
| parent | 95bfa5691ce0d43fbaa9c011df4ec9561635ff8d (diff) | |
| download | karakeep-9decab898983bc132835d4c517fc02aa695cb4af.tar.zst | |
refactor: lazy init background queues
Diffstat (limited to 'packages/shared-server/src')
| -rw-r--r-- | packages/shared-server/src/queues.ts | 144 |
1 files changed, 96 insertions, 48 deletions
diff --git a/packages/shared-server/src/queues.ts b/packages/shared-server/src/queues.ts index 140d9c0b..38c7bfe1 100644 --- a/packages/shared-server/src/queues.ts +++ b/packages/shared-server/src/queues.ts @@ -1,19 +1,75 @@ import { z } from "zod"; -import { EnqueueOptions, getQueueClient } from "@karakeep/shared/queueing"; +import { + EnqueueOptions, + getQueueClient, + Queue, + QueueClient, + QueueOptions, +} from "@karakeep/shared/queueing"; import { zRuleEngineEventSchema } from "@karakeep/shared/types/rules"; import { loadAllPlugins } from "."; -await loadAllPlugins(); -const QUEUE_CLIENT = await getQueueClient(); +// Lazy client initialization - plugins are loaded on first access +// We cache the promise to ensure only one initialization happens even with concurrent calls +let clientPromise: Promise<QueueClient> | null = null; + +function getClient(): Promise<QueueClient> { + if (!clientPromise) { + clientPromise = (async () => { + await loadAllPlugins(); + return await getQueueClient(); + })(); + } + return clientPromise; +} + +/** + * Creates a deferred queue that initializes lazily on first use. + * This allows the module to be imported without requiring plugins to be loaded. + */ +function createDeferredQueue<T>(name: string, options: QueueOptions): Queue<T> { + // Cache the promise to ensure only one queue is created even with concurrent calls + let queuePromise: Promise<Queue<T>> | null = null; + + const ensureQueue = (): Promise<Queue<T>> => { + if (!queuePromise) { + queuePromise = (async () => { + const client = await getClient(); + return client.createQueue<T>(name, options); + })(); + } + return queuePromise; + }; + + return { + opts: options, + name: () => name, + ensureInit: async () => { + await ensureQueue(); + }, + async enqueue(payload: T, opts?: EnqueueOptions) { + return (await ensureQueue()).enqueue(payload, opts); + }, + async stats() { + return (await ensureQueue()).stats(); + }, + async cancelAllNonRunning() { + const q = await ensureQueue(); + return q.cancelAllNonRunning?.() ?? 0; + }, + }; +} export async function prepareQueue() { - await QUEUE_CLIENT.prepare(); + const client = await getClient(); + await client.prepare(); } export async function startQueue() { - await QUEUE_CLIENT.start(); + const client = await getClient(); + await client.start(); } // Link Crawler @@ -25,7 +81,7 @@ export const zCrawlLinkRequestSchema = z.object({ }); export type ZCrawlLinkRequest = z.input<typeof zCrawlLinkRequestSchema>; -export const LinkCrawlerQueue = QUEUE_CLIENT.createQueue<ZCrawlLinkRequest>( +export const LinkCrawlerQueue = createDeferredQueue<ZCrawlLinkRequest>( "link_crawler_queue", { defaultJobArgs: { @@ -42,15 +98,12 @@ export const zOpenAIRequestSchema = z.object({ }); export type ZOpenAIRequest = z.infer<typeof zOpenAIRequestSchema>; -export const OpenAIQueue = QUEUE_CLIENT.createQueue<ZOpenAIRequest>( - "openai_queue", - { - defaultJobArgs: { - numRetries: 3, - }, - keepFailedJobs: false, +export const OpenAIQueue = createDeferredQueue<ZOpenAIRequest>("openai_queue", { + defaultJobArgs: { + numRetries: 3, }, -); + keepFailedJobs: false, +}); // Search Indexing Worker export const zSearchIndexingRequestSchema = z.object({ @@ -60,13 +113,15 @@ export const zSearchIndexingRequestSchema = z.object({ export type ZSearchIndexingRequest = z.infer< typeof zSearchIndexingRequestSchema >; -export const SearchIndexingQueue = - QUEUE_CLIENT.createQueue<ZSearchIndexingRequest>("searching_indexing", { +export const SearchIndexingQueue = createDeferredQueue<ZSearchIndexingRequest>( + "searching_indexing", + { defaultJobArgs: { numRetries: 5, }, keepFailedJobs: false, - }); + }, +); // Admin maintenance worker export const zTidyAssetsRequestSchema = z.object({ @@ -96,13 +151,15 @@ export type ZAdminMaintenanceMigrateLargeLinkHtmlTask = Extract< { type: "migrate_large_link_html" } >; -export const AdminMaintenanceQueue = - QUEUE_CLIENT.createQueue<ZAdminMaintenanceTask>("admin_maintenance_queue", { +export const AdminMaintenanceQueue = createDeferredQueue<ZAdminMaintenanceTask>( + "admin_maintenance_queue", + { defaultJobArgs: { numRetries: 1, }, keepFailedJobs: false, - }); + }, +); export async function triggerSearchReindex( bookmarkId: string, @@ -126,7 +183,7 @@ export const zvideoRequestSchema = z.object({ }); export type ZVideoRequest = z.infer<typeof zvideoRequestSchema>; -export const VideoWorkerQueue = QUEUE_CLIENT.createQueue<ZVideoRequest>( +export const VideoWorkerQueue = createDeferredQueue<ZVideoRequest>( "video_queue", { defaultJobArgs: { @@ -142,16 +199,13 @@ export const zFeedRequestSchema = z.object({ }); export type ZFeedRequestSchema = z.infer<typeof zFeedRequestSchema>; -export const FeedQueue = QUEUE_CLIENT.createQueue<ZFeedRequestSchema>( - "feed_queue", - { - defaultJobArgs: { - // One retry is enough for the feed queue given that it's periodic - numRetries: 1, - }, - keepFailedJobs: false, +export const FeedQueue = createDeferredQueue<ZFeedRequestSchema>("feed_queue", { + defaultJobArgs: { + // One retry is enough for the feed queue given that it's periodic + numRetries: 1, }, -); + keepFailedJobs: false, +}); // Preprocess Assets export const zAssetPreprocessingRequestSchema = z.object({ @@ -162,15 +216,12 @@ export type AssetPreprocessingRequest = z.infer< typeof zAssetPreprocessingRequestSchema >; export const AssetPreprocessingQueue = - QUEUE_CLIENT.createQueue<AssetPreprocessingRequest>( - "asset_preprocessing_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, + createDeferredQueue<AssetPreprocessingRequest>("asset_preprocessing_queue", { + defaultJobArgs: { + numRetries: 2, }, - ); + keepFailedJobs: false, + }); // Webhook worker export const zWebhookRequestSchema = z.object({ @@ -179,7 +230,7 @@ export const zWebhookRequestSchema = z.object({ userId: z.string().optional(), }); export type ZWebhookRequest = z.infer<typeof zWebhookRequestSchema>; -export const WebhookQueue = QUEUE_CLIENT.createQueue<ZWebhookRequest>( +export const WebhookQueue = createDeferredQueue<ZWebhookRequest>( "webhook_queue", { defaultJobArgs: { @@ -211,7 +262,7 @@ export const zRuleEngineRequestSchema = z.object({ events: z.array(zRuleEngineEventSchema), }); export type ZRuleEngineRequest = z.infer<typeof zRuleEngineRequestSchema>; -export const RuleEngineQueue = QUEUE_CLIENT.createQueue<ZRuleEngineRequest>( +export const RuleEngineQueue = createDeferredQueue<ZRuleEngineRequest>( "rule_engine_queue", { defaultJobArgs: { @@ -241,12 +292,9 @@ export const zBackupRequestSchema = z.object({ backupId: z.string().optional(), }); export type ZBackupRequest = z.infer<typeof zBackupRequestSchema>; -export const BackupQueue = QUEUE_CLIENT.createQueue<ZBackupRequest>( - "backup_queue", - { - defaultJobArgs: { - numRetries: 2, - }, - keepFailedJobs: false, +export const BackupQueue = createDeferredQueue<ZBackupRequest>("backup_queue", { + defaultJobArgs: { + numRetries: 2, }, -); + keepFailedJobs: false, +}); |
